ecb91dba217a48ef0cb462c96b210938ae962f96
[aai/esr-gui.git] /
1 "use strict";
2
3 var common = require('./common')
4         , utils = require('../utils')
5   , toError = require('../utils').toError
6   , f = require('util').format
7         , handleCallback = require('../utils').handleCallback
8   , shallowClone = utils.shallowClone
9   , WriteError = common.WriteError
10   , BulkWriteResult = common.BulkWriteResult
11   , LegacyOp = common.LegacyOp
12   , ObjectID = require('mongodb-core').BSON.ObjectID
13         , BSON = require('mongodb-core').BSON
14   , Define = require('../metadata')
15   , Batch = common.Batch
16   , mergeBatchResults = common.mergeBatchResults;
17
18 var bson = new BSON.BSONPure();
19
20 /**
21  * Create a FindOperatorsUnordered instance (INTERNAL TYPE, do not instantiate directly)
22  * @class
23  * @property {number} length Get the number of operations in the bulk.
24  * @return {FindOperatorsUnordered} a FindOperatorsUnordered instance.
25  */
26 var FindOperatorsUnordered = function(self) {
27   this.s = self.s;
28 }
29
30 /**
31  * Add a single update document to the bulk operation
32  *
33  * @method
34  * @param {object} doc update operations
35  * @throws {MongoError}
36  * @return {UnorderedBulkOperation}
37  */
38 FindOperatorsUnordered.prototype.update = function(updateDocument) {
39   // Perform upsert
40   var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
41
42   // Establish the update command
43   var document = {
44       q: this.s.currentOp.selector
45     , u: updateDocument
46     , multi: true
47     , upsert: upsert
48   }
49
50   // Clear out current Op
51   this.s.currentOp = null;
52   // Add the update document to the list
53   return addToOperationsList(this, common.UPDATE, document);
54 }
55
56 /**
57  * Add a single update one document to the bulk operation
58  *
59  * @method
60  * @param {object} doc update operations
61  * @throws {MongoError}
62  * @return {UnorderedBulkOperation}
63  */
64 FindOperatorsUnordered.prototype.updateOne = function(updateDocument) {
65   // Perform upsert
66   var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
67
68   // Establish the update command
69   var document = {
70       q: this.s.currentOp.selector
71     , u: updateDocument
72     , multi: false
73     , upsert: upsert
74   }
75
76   // Clear out current Op
77   this.s.currentOp = null;
78   // Add the update document to the list
79   return addToOperationsList(this, common.UPDATE, document);
80 }
81
82 /**
83  * Add a replace one operation to the bulk operation
84  *
85  * @method
86  * @param {object} doc the new document to replace the existing one with
87  * @throws {MongoError}
88  * @return {UnorderedBulkOperation}
89  */
90 FindOperatorsUnordered.prototype.replaceOne = function(updateDocument) {
91   this.updateOne(updateDocument);
92 }
93
94 /**
95  * Upsert modifier for update bulk operation
96  *
97  * @method
98  * @throws {MongoError}
99  * @return {UnorderedBulkOperation}
100  */
101 FindOperatorsUnordered.prototype.upsert = function() {
102   this.s.currentOp.upsert = true;
103   return this;
104 }
105
106 /**
107  * Add a remove one operation to the bulk operation
108  *
109  * @method
110  * @throws {MongoError}
111  * @return {UnorderedBulkOperation}
112  */
113 FindOperatorsUnordered.prototype.removeOne = function() {
114   // Establish the update command
115   var document = {
116       q: this.s.currentOp.selector
117     , limit: 1
118   }
119
120   // Clear out current Op
121   this.s.currentOp = null;
122   // Add the remove document to the list
123   return addToOperationsList(this, common.REMOVE, document);
124 }
125
126 /**
127  * Add a remove operation to the bulk operation
128  *
129  * @method
130  * @throws {MongoError}
131  * @return {UnorderedBulkOperation}
132  */
133 FindOperatorsUnordered.prototype.remove = function() {
134   // Establish the update command
135   var document = {
136       q: this.s.currentOp.selector
137     , limit: 0
138   }
139
140   // Clear out current Op
141   this.s.currentOp = null;
142   // Add the remove document to the list
143   return addToOperationsList(this, common.REMOVE, document);
144 }
145
146 //
147 // Add to the operations list
148 //
149 var addToOperationsList = function(_self, docType, document) {
150   // Get the bsonSize
151   var bsonSize = bson.calculateObjectSize(document, false);
152   // Throw error if the doc is bigger than the max BSON size
153   if(bsonSize >= _self.s.maxBatchSizeBytes) throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
154   // Holds the current batch
155   _self.s.currentBatch = null;
156   // Get the right type of batch
157   if(docType == common.INSERT) {
158     _self.s.currentBatch = _self.s.currentInsertBatch;
159   } else if(docType == common.UPDATE) {
160     _self.s.currentBatch = _self.s.currentUpdateBatch;
161   } else if(docType == common.REMOVE) {
162     _self.s.currentBatch = _self.s.currentRemoveBatch;
163   }
164
165   // Create a new batch object if we don't have a current one
166   if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
167
168   // Check if we need to create a new batch
169   if(((_self.s.currentBatch.size + 1) >= _self.s.maxWriteBatchSize)
170     || ((_self.s.currentBatch.sizeBytes + bsonSize) >= _self.s.maxBatchSizeBytes)
171     || (_self.s.currentBatch.batchType != docType)) {
172     // Save the batch to the execution stack
173     _self.s.batches.push(_self.s.currentBatch);
174
175     // Create a new batch
176     _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
177   }
178
179   // We have an array of documents
180   if(Array.isArray(document)) {
181     throw toError("operation passed in cannot be an Array");
182   } else {
183     _self.s.currentBatch.operations.push(document);
184     _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
185     _self.s.currentIndex = _self.s.currentIndex + 1;
186   }
187
188   // Save back the current Batch to the right type
189   if(docType == common.INSERT) {
190     _self.s.currentInsertBatch = _self.s.currentBatch;
191     _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
192   } else if(docType == common.UPDATE) {
193     _self.s.currentUpdateBatch = _self.s.currentBatch;
194   } else if(docType == common.REMOVE) {
195     _self.s.currentRemoveBatch = _self.s.currentBatch;
196   }
197
198   // Update current batch size
199   _self.s.currentBatch.size = _self.s.currentBatch.size + 1;
200   _self.s.currentBatch.sizeBytes = _self.s.currentBatch.sizeBytes + bsonSize;
201
202   // Return self
203   return _self;
204 }
205
206 /**
207  * Create a new UnorderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
208  * @class
209  * @return {UnorderedBulkOperation} a UnorderedBulkOperation instance.
210  */
211 var UnorderedBulkOperation = function(topology, collection, options) {
212         options = options == null ? {} : options;
213
214         // Contains reference to self
215         var self = this;
216         // Get the namesspace for the write operations
217   var namespace = collection.collectionName;
218   // Used to mark operation as executed
219   var executed = false;
220
221         // Current item
222   // var currentBatch = null;
223         var currentOp = null;
224         var currentIndex = 0;
225   var batches = [];
226
227   // The current Batches for the different operations
228   var currentInsertBatch = null;
229   var currentUpdateBatch = null;
230   var currentRemoveBatch = null;
231
232         // Handle to the bson serializer, used to calculate running sizes
233         var bson = topology.bson;
234
235   // Set max byte size
236   var maxBatchSizeBytes = topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
237     ? topology.isMasterDoc.maxBsonObjectSize : (1024*1025*16);
238   var maxWriteBatchSize = topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
239     ? topology.isMasterDoc.maxWriteBatchSize : 1000;
240
241   // Get the write concern
242   var writeConcern = common.writeConcern(shallowClone(options), collection, options);
243
244   // Get the promiseLibrary
245   var promiseLibrary = options.promiseLibrary;
246
247   // No promise library selected fall back
248   if(!promiseLibrary) {
249     promiseLibrary = typeof global.Promise == 'function' ?
250       global.Promise : require('es6-promise').Promise;
251   }
252
253   // Final results
254   var bulkResult = {
255           ok: 1
256     , writeErrors: []
257     , writeConcernErrors: []
258     , insertedIds: []
259     , nInserted: 0
260     , nUpserted: 0
261     , nMatched: 0
262     , nModified: 0
263     , nRemoved: 0
264     , upserted: []
265   };
266
267   // Internal state
268   this.s = {
269     // Final result
270       bulkResult: bulkResult
271     // Current batch state
272     , currentInsertBatch: null
273     , currentUpdateBatch: null
274     , currentRemoveBatch: null
275     , currentBatch: null
276     , currentIndex: 0
277     , batches: []
278     // Write concern
279     , writeConcern: writeConcern
280     // Max batch size options
281     , maxBatchSizeBytes: maxBatchSizeBytes
282     , maxWriteBatchSize: maxWriteBatchSize
283     // Namespace
284     , namespace: namespace
285     // BSON
286     , bson: bson
287     // Topology
288     , topology: topology
289     // Options
290     , options: options
291     // Current operation
292     , currentOp: currentOp
293     // Executed
294     , executed: executed
295     // Collection
296     , collection: collection
297     // Promise Library
298     , promiseLibrary: promiseLibrary
299     // Bypass validation
300     , bypassDocumentValidation: typeof options.bypassDocumentValidation == 'boolean' ? options.bypassDocumentValidation : false
301   }
302 }
303
304 var define = UnorderedBulkOperation.define = new Define('UnorderedBulkOperation', UnorderedBulkOperation, false);
305
306 /**
307  * Add a single insert document to the bulk operation
308  *
309  * @param {object} doc the document to insert
310  * @throws {MongoError}
311  * @return {UnorderedBulkOperation}
312  */
313 UnorderedBulkOperation.prototype.insert = function(document) {
314   if(this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null) document._id = new ObjectID();
315   return addToOperationsList(this, common.INSERT, document);
316 }
317
318 /**
319  * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
320  *
321  * @method
322  * @param {object} selector The selector for the bulk operation.
323  * @throws {MongoError}
324  * @return {FindOperatorsUnordered}
325  */
326 UnorderedBulkOperation.prototype.find = function(selector) {
327   if (!selector) {
328     throw toError("Bulk find operation must specify a selector");
329   }
330
331   // Save a current selector
332   this.s.currentOp = {
333     selector: selector
334   }
335
336   return new FindOperatorsUnordered(this);
337 }
338
339 Object.defineProperty(UnorderedBulkOperation.prototype, 'length', {
340   enumerable: true,
341   get: function() {
342     return this.s.currentIndex;
343   }
344 });
345
346 UnorderedBulkOperation.prototype.raw = function(op) {
347   var key = Object.keys(op)[0];
348
349   // Set up the force server object id
350   var forceServerObjectId = typeof this.s.options.forceServerObjectId == 'boolean'
351     ? this.s.options.forceServerObjectId : this.s.collection.s.db.options.forceServerObjectId;
352
353   // Update operations
354   if((op.updateOne && op.updateOne.q)
355     || (op.updateMany && op.updateMany.q)
356     || (op.replaceOne && op.replaceOne.q)) {
357     op[key].multi = op.updateOne || op.replaceOne ? false : true;
358     return addToOperationsList(this, common.UPDATE, op[key]);
359   }
360
361   // Crud spec update format
362   if(op.updateOne || op.updateMany || op.replaceOne) {
363     var multi = op.updateOne || op.replaceOne ? false : true;
364     var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
365     if(op[key].upsert) operation.upsert = true;
366     return addToOperationsList(this, common.UPDATE, operation);
367   }
368
369   // Remove operations
370   if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
371     op[key].limit = op.removeOne ? 1 : 0;
372     return addToOperationsList(this, common.REMOVE, op[key]);
373   }
374
375   // Crud spec delete operations, less efficient
376   if(op.deleteOne || op.deleteMany) {
377     var limit = op.deleteOne ? 1 : 0;
378     var operation = {q: op[key].filter, limit: limit}
379     return addToOperationsList(this, common.REMOVE, operation);
380   }
381
382   // Insert operations
383   if(op.insertOne && op.insertOne.document == null) {
384     if(forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
385     return addToOperationsList(this, common.INSERT, op.insertOne);
386   } else if(op.insertOne && op.insertOne.document) {
387     if(forceServerObjectId !== true && op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
388     return addToOperationsList(this, common.INSERT, op.insertOne.document);
389   }
390
391   if(op.insertMany) {
392     for(var i = 0; i < op.insertMany.length; i++) {
393       if(forceServerObjectId !== true && op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
394       addToOperationsList(this, common.INSERT, op.insertMany[i]);
395     }
396
397     return;
398   }
399
400   // No valid type of operation
401   throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
402 }
403
404 //
405 // Execute the command
406 var executeBatch = function(self, batch, callback) {
407   var finalOptions = {ordered: false}
408   if(self.s.writeConcern != null) {
409     finalOptions.writeConcern = self.s.writeConcern;
410   }
411
412   var resultHandler = function(err, result) {
413                 // Error is a driver related error not a bulk op error, terminate
414                 if(err && err.driver || err && err.message) {
415                         return handleCallback(callback, err);
416                 }
417
418     // If we have and error
419     if(err) err.ok = 0;
420     handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
421   }
422
423         // Set an operationIf if provided
424         if(self.operationId) {
425                 resultHandler.operationId = self.operationId;
426         }
427
428         // Serialize functions
429         if(self.s.options.serializeFunctions) {
430                 finalOptions.serializeFunctions = true
431         }
432
433   // Is the bypassDocumentValidation options specific
434   if(self.s.bypassDocumentValidation == true) {
435     finalOptions.bypassDocumentValidation = true;
436   }
437
438   try {
439     if(batch.batchType == common.INSERT) {
440       self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
441     } else if(batch.batchType == common.UPDATE) {
442       self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
443     } else if(batch.batchType == common.REMOVE) {
444       self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
445     }
446   } catch(err) {
447     // Force top level error
448     err.ok = 0;
449     // Merge top level error and return
450     handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
451   }
452 }
453
454 //
455 // Execute all the commands
456 var executeBatches = function(self, callback) {
457   var numberOfCommandsToExecute = self.s.batches.length;
458         var error = null;
459   // Execute over all the batches
460   for(var i = 0; i < self.s.batches.length; i++) {
461     executeBatch(self, self.s.batches[i], function(err, result) {
462                         // Driver layer error capture it
463                         if(err) error = err;
464                         // Count down the number of commands left to execute
465       numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
466
467       // Execute
468       if(numberOfCommandsToExecute == 0) {
469                                 // Driver level error
470                                 if(error) return handleCallback(callback, error);
471                                 // Treat write errors
472         var error = self.s.bulkResult.writeErrors.length > 0 ? toError(self.s.bulkResult.writeErrors[0]) : null;
473         handleCallback(callback, error, new BulkWriteResult(self.s.bulkResult));
474       }
475     });
476   }
477 }
478
479 /**
480  * The callback format for results
481  * @callback UnorderedBulkOperation~resultCallback
482  * @param {MongoError} error An error instance representing the error during the execution.
483  * @param {BulkWriteResult} result The bulk write result.
484  */
485
486 /**
487  * Execute the ordered bulk operation
488  *
489  * @method
490  * @param {object} [options=null] Optional settings.
491  * @param {(number|string)} [options.w=null] The write concern.
492  * @param {number} [options.wtimeout=null] The write concern timeout.
493  * @param {boolean} [options.j=false] Specify a journal write concern.
494  * @param {boolean} [options.fsync=false] Specify a file sync write concern.
495  * @param {UnorderedBulkOperation~resultCallback} [callback] The result callback
496  * @throws {MongoError}
497  * @return {Promise} returns Promise if no callback passed
498  */
499 UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
500   var self = this;
501   if(this.s.executed) throw toError("batch cannot be re-executed");
502   if(typeof _writeConcern == 'function') {
503     callback = _writeConcern;
504   } else {
505     this.s.writeConcern = _writeConcern;
506   }
507
508   // If we have current batch
509   if(this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
510   if(this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
511   if(this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
512
513   // If we have no operations in the bulk raise an error
514   if(this.s.batches.length == 0) {
515     throw toError("Invalid Operation, No operations in bulk");
516   }
517
518   // Execute using callback
519   if(typeof callback == 'function') return executeBatches(this, callback);
520
521   // Return a Promise
522   return new this.s.promiseLibrary(function(resolve, reject) {
523     executeBatches(self, function(err, r) {
524       if(err) return reject(err);
525       resolve(r);
526     });
527   });
528 }
529
530 define.classMethod('execute', {callback: true, promise:false});
531
532 /**
533  * Returns an unordered batch object
534  * @ignore
535  */
536 var initializeUnorderedBulkOp = function(topology, collection, options) {
537         return new UnorderedBulkOperation(topology, collection, options);
538 }
539
540 initializeUnorderedBulkOp.UnorderedBulkOperation = UnorderedBulkOperation;
541 module.exports = initializeUnorderedBulkOp;
542 module.exports.Bulk = UnorderedBulkOperation;