b6c183dec91996d695bb6baa3ae1073a4653576b
[aai/esr-gui.git] /
1 "use strict";
2
3 var common = require('./common')
4         , utils = require('../utils')
5   , toError = require('../utils').toError
6         , f = require('util').format
7         , handleCallback = require('../utils').handleCallback
8         , shallowClone = utils.shallowClone
9   , WriteError = common.WriteError
10   , BulkWriteResult = common.BulkWriteResult
11   , LegacyOp = common.LegacyOp
12   , ObjectID = require('mongodb-core').BSON.ObjectID
13   , Define = require('../metadata')
14         , BSON = require('mongodb-core').BSON
15   , Batch = common.Batch
16   , mergeBatchResults = common.mergeBatchResults;
17
18 var bson = new BSON.BSONPure();
19
20 /**
21  * Create a FindOperatorsOrdered instance (INTERNAL TYPE, do not instantiate directly)
22  * @class
23  * @return {FindOperatorsOrdered} a FindOperatorsOrdered instance.
24  */
25 var FindOperatorsOrdered = function(self) {
26   this.s = self.s;
27 }
28
29 /**
30  * Add a single update document to the bulk operation
31  *
32  * @method
33  * @param {object} doc update operations
34  * @throws {MongoError}
35  * @return {OrderedBulkOperation}
36  */
37 FindOperatorsOrdered.prototype.update = function(updateDocument) {
38   // Perform upsert
39   var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
40
41   // Establish the update command
42   var document = {
43       q: this.s.currentOp.selector
44     , u: updateDocument
45     , multi: true
46     , upsert: upsert
47   }
48
49   // Clear out current Op
50   this.s.currentOp = null;
51   // Add the update document to the list
52   return addToOperationsList(this, common.UPDATE, document);
53 }
54
55 /**
56  * Add a single update one document to the bulk operation
57  *
58  * @method
59  * @param {object} doc update operations
60  * @throws {MongoError}
61  * @return {OrderedBulkOperation}
62  */
63 FindOperatorsOrdered.prototype.updateOne = function(updateDocument) {
64   // Perform upsert
65   var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
66
67   // Establish the update command
68   var document = {
69       q: this.s.currentOp.selector
70     , u: updateDocument
71     , multi: false
72     , upsert: upsert
73   }
74
75   // Clear out current Op
76   this.s.currentOp = null;
77   // Add the update document to the list
78   return addToOperationsList(this, common.UPDATE, document);
79 }
80
81 /**
82  * Add a replace one operation to the bulk operation
83  *
84  * @method
85  * @param {object} doc the new document to replace the existing one with
86  * @throws {MongoError}
87  * @return {OrderedBulkOperation}
88  */
89 FindOperatorsOrdered.prototype.replaceOne = function(updateDocument) {
90   this.updateOne(updateDocument);
91 }
92
93 /**
94  * Upsert modifier for update bulk operation
95  *
96  * @method
97  * @throws {MongoError}
98  * @return {FindOperatorsOrdered}
99  */
100 FindOperatorsOrdered.prototype.upsert = function() {
101   this.s.currentOp.upsert = true;
102   return this;
103 }
104
105 /**
106  * Add a remove one operation to the bulk operation
107  *
108  * @method
109  * @throws {MongoError}
110  * @return {OrderedBulkOperation}
111  */
112 FindOperatorsOrdered.prototype.deleteOne = function() {
113   // Establish the update command
114   var document = {
115       q: this.s.currentOp.selector
116     , limit: 1
117   }
118
119   // Clear out current Op
120   this.s.currentOp = null;
121   // Add the remove document to the list
122   return addToOperationsList(this, common.REMOVE, document);
123 }
124
125 // Backward compatibility
126 FindOperatorsOrdered.prototype.removeOne = FindOperatorsOrdered.prototype.deleteOne;
127
128 /**
129  * Add a remove operation to the bulk operation
130  *
131  * @method
132  * @throws {MongoError}
133  * @return {OrderedBulkOperation}
134  */
135 FindOperatorsOrdered.prototype.delete = function() {
136   // Establish the update command
137   var document = {
138       q: this.s.currentOp.selector
139     , limit: 0
140   }
141
142   // Clear out current Op
143   this.s.currentOp = null;
144   // Add the remove document to the list
145   return addToOperationsList(this, common.REMOVE, document);
146 }
147
148 // Backward compatibility
149 FindOperatorsOrdered.prototype.remove = FindOperatorsOrdered.prototype.delete;
150
151 // Add to internal list of documents
152 var addToOperationsList = function(_self, docType, document) {
153   // Get the bsonSize
154   var bsonSize = bson.calculateObjectSize(document, false);
155
156   // Throw error if the doc is bigger than the max BSON size
157   if(bsonSize >= _self.s.maxBatchSizeBytes) {
158                 throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
159         }
160
161   // Create a new batch object if we don't have a current one
162   if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
163
164   // Check if we need to create a new batch
165   if(((_self.s.currentBatchSize + 1) >= _self.s.maxWriteBatchSize)
166     || ((_self.s.currentBatchSizeBytes +  _self.s.currentBatchSizeBytes) >= _self.s.maxBatchSizeBytes)
167     || (_self.s.currentBatch.batchType != docType)) {
168     // Save the batch to the execution stack
169     _self.s.batches.push(_self.s.currentBatch);
170
171     // Create a new batch
172     _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
173
174     // Reset the current size trackers
175     _self.s.currentBatchSize = 0;
176     _self.s.currentBatchSizeBytes = 0;
177   } else {
178     // Update current batch size
179     _self.s.currentBatchSize = _self.s.currentBatchSize + 1;
180     _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
181   }
182
183   if(docType == common.INSERT) {
184     _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
185   }
186
187   // We have an array of documents
188   if(Array.isArray(document)) {
189     throw toError("operation passed in cannot be an Array");
190   } else {
191     _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
192     _self.s.currentBatch.operations.push(document)
193                 _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
194     _self.s.currentIndex = _self.s.currentIndex + 1;
195   }
196
197   // Return self
198   return _self;
199 }
200
201 /**
202  * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
203  * @class
204  * @property {number} length Get the number of operations in the bulk.
205  * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
206  */
207 function OrderedBulkOperation(topology, collection, options) {
208         options = options == null ? {} : options;
209         // TODO Bring from driver information in isMaster
210         var self = this;
211         var executed = false;
212
213         // Current item
214         var currentOp = null;
215
216         // Handle to the bson serializer, used to calculate running sizes
217         var bson = topology.bson;
218
219         // Namespace for the operation
220   var namespace = collection.collectionName;
221
222   // Set max byte size
223         var maxBatchSizeBytes = topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
224     ? topology.isMasterDoc.maxBsonObjectSize : (1024*1025*16);
225         var maxWriteBatchSize = topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
226     ? topology.isMasterDoc.maxWriteBatchSize : 1000;
227
228   // Get the write concern
229   var writeConcern = common.writeConcern(shallowClone(options), collection, options);
230
231   // Get the promiseLibrary
232   var promiseLibrary = options.promiseLibrary;
233
234   // No promise library selected fall back
235   if(!promiseLibrary) {
236     promiseLibrary = typeof global.Promise == 'function' ?
237       global.Promise : require('es6-promise').Promise;
238   }
239
240   // Current batch
241   var currentBatch = null;
242   var currentIndex = 0;
243   var currentBatchSize = 0;
244   var currentBatchSizeBytes = 0;
245   var batches = [];
246
247   // Final results
248   var bulkResult = {
249           ok: 1
250     , writeErrors: []
251     , writeConcernErrors: []
252     , insertedIds: []
253     , nInserted: 0
254     , nUpserted: 0
255     , nMatched: 0
256     , nModified: 0
257     , nRemoved: 0
258     , upserted: []
259   };
260
261   // Internal state
262   this.s = {
263     // Final result
264       bulkResult: bulkResult
265     // Current batch state
266     , currentBatch: null
267     , currentIndex: 0
268     , currentBatchSize: 0
269     , currentBatchSizeBytes: 0
270     , batches: []
271     // Write concern
272     , writeConcern: writeConcern
273     // Max batch size options
274     , maxBatchSizeBytes: maxBatchSizeBytes
275     , maxWriteBatchSize: maxWriteBatchSize
276     // Namespace
277     , namespace: namespace
278     // BSON
279     , bson: bson
280     // Topology
281     , topology: topology
282     // Options
283     , options: options
284     // Current operation
285     , currentOp: currentOp
286     // Executed
287     , executed: executed
288     // Collection
289     , collection: collection
290     // Promise Library
291     , promiseLibrary: promiseLibrary
292                 // Fundamental error
293                 , err: null
294     // Bypass validation
295     , bypassDocumentValidation: typeof options.bypassDocumentValidation == 'boolean' ? options.bypassDocumentValidation : false
296   }
297 }
298
299 var define = OrderedBulkOperation.define = new Define('OrderedBulkOperation', OrderedBulkOperation, false);
300
301 OrderedBulkOperation.prototype.raw = function(op) {
302   var key = Object.keys(op)[0];
303
304   // Set up the force server object id
305   var forceServerObjectId = typeof this.s.options.forceServerObjectId == 'boolean'
306     ? this.s.options.forceServerObjectId : this.s.collection.s.db.options.forceServerObjectId;
307
308   // Update operations
309   if((op.updateOne && op.updateOne.q)
310     || (op.updateMany && op.updateMany.q)
311     || (op.replaceOne && op.replaceOne.q)) {
312     op[key].multi = op.updateOne || op.replaceOne ? false : true;
313     return addToOperationsList(this, common.UPDATE, op[key]);
314   }
315
316   // Crud spec update format
317   if(op.updateOne || op.updateMany || op.replaceOne) {
318     var multi = op.updateOne || op.replaceOne ? false : true;
319     var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
320     operation.upsert = op[key].upsert ? true: false;
321                 if(op.collation) operation.collation = op.collation;
322     return addToOperationsList(this, common.UPDATE, operation);
323   }
324
325   // Remove operations
326   if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
327     op[key].limit = op.removeOne ? 1 : 0;
328     return addToOperationsList(this, common.REMOVE, op[key]);
329   }
330
331   // Crud spec delete operations, less efficient
332   if(op.deleteOne || op.deleteMany) {
333     var limit = op.deleteOne ? 1 : 0;
334     var operation = {q: op[key].filter, limit: limit}
335                 if(op.collation) operation.collation = op.collation;
336     return addToOperationsList(this, common.REMOVE, operation);
337   }
338
339   // Insert operations
340   if(op.insertOne && op.insertOne.document == null) {
341     if(forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
342     return addToOperationsList(this, common.INSERT, op.insertOne);
343   } else if(op.insertOne && op.insertOne.document) {
344     if(forceServerObjectId !== true && op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
345     return addToOperationsList(this, common.INSERT, op.insertOne.document);
346   }
347
348   if(op.insertMany) {
349     for(var i = 0; i < op.insertMany.length; i++) {
350       if(forceServerObjectId !== true && op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
351       addToOperationsList(this, common.INSERT, op.insertMany[i]);
352     }
353
354     return;
355   }
356
357   // No valid type of operation
358   throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
359 }
360
361 /**
362  * Add a single insert document to the bulk operation
363  *
364  * @param {object} doc the document to insert
365  * @throws {MongoError}
366  * @return {OrderedBulkOperation}
367  */
368 OrderedBulkOperation.prototype.insert = function(document) {
369   if(this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null) document._id = new ObjectID();
370   return addToOperationsList(this, common.INSERT, document);
371 }
372
373 /**
374  * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
375  *
376  * @method
377  * @param {object} selector The selector for the bulk operation.
378  * @throws {MongoError}
379  * @return {FindOperatorsOrdered}
380  */
381 OrderedBulkOperation.prototype.find = function(selector) {
382   if (!selector) {
383     throw toError("Bulk find operation must specify a selector");
384   }
385
386   // Save a current selector
387   this.s.currentOp = {
388     selector: selector
389   }
390
391   return new FindOperatorsOrdered(this);
392 }
393
394 Object.defineProperty(OrderedBulkOperation.prototype, 'length', {
395   enumerable: true,
396   get: function() {
397     return this.s.currentIndex;
398   }
399 });
400
401 //
402 // Execute next write command in a chain
403 var executeCommands = function(self, callback) {
404   if(self.s.batches.length == 0) {
405     return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
406   }
407
408   // Ordered execution of the command
409   var batch = self.s.batches.shift();
410
411   var resultHandler = function(err, result) {
412                 // Error is a driver related error not a bulk op error, terminate
413                 if(err && err.driver || err && err.message) {
414                         return handleCallback(callback, err);
415                 }
416
417     // If we have and error
418     if(err) err.ok = 0;
419     // Merge the results together
420     var mergeResult = mergeBatchResults(true, batch, self.s.bulkResult, err, result);
421     if(mergeResult != null) {
422       return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
423     }
424
425     // If we are ordered and have errors and they are
426     // not all replication errors terminate the operation
427     if(self.s.bulkResult.writeErrors.length > 0) {
428       return handleCallback(callback, toError(self.s.bulkResult.writeErrors[0]), new BulkWriteResult(self.s.bulkResult));
429     }
430
431     // Execute the next command in line
432     executeCommands(self, callback);
433   }
434
435   var finalOptions = {ordered: true}
436   if(self.s.writeConcern != null) {
437     finalOptions.writeConcern = self.s.writeConcern;
438   }
439
440         // Set an operationIf if provided
441         if(self.operationId) {
442                 resultHandler.operationId = self.operationId;
443         }
444
445         // Serialize functions
446         if(self.s.options.serializeFunctions) {
447                 finalOptions.serializeFunctions = true
448         }
449
450   // Serialize functions
451   if(self.s.options.ignoreUndefined) {
452     finalOptions.ignoreUndefined = true
453   }
454
455   // Is the bypassDocumentValidation options specific
456   if(self.s.bypassDocumentValidation == true) {
457     finalOptions.bypassDocumentValidation = true;
458   }
459
460   try {
461     if(batch.batchType == common.INSERT) {
462       self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
463     } else if(batch.batchType == common.UPDATE) {
464       self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
465     } else if(batch.batchType == common.REMOVE) {
466       self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
467     }
468   } catch(err) {
469     // Force top level error
470     err.ok = 0;
471     // Merge top level error and return
472     handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
473   }
474 }
475
476 /**
477  * The callback format for results
478  * @callback OrderedBulkOperation~resultCallback
479  * @param {MongoError} error An error instance representing the error during the execution.
480  * @param {BulkWriteResult} result The bulk write result.
481  */
482
483 /**
484  * Execute the ordered bulk operation
485  *
486  * @method
487  * @param {object} [options=null] Optional settings.
488  * @param {(number|string)} [options.w=null] The write concern.
489  * @param {number} [options.wtimeout=null] The write concern timeout.
490  * @param {boolean} [options.j=false] Specify a journal write concern.
491  * @param {boolean} [options.fsync=false] Specify a file sync write concern.
492  * @param {OrderedBulkOperation~resultCallback} [callback] The result callback
493  * @throws {MongoError}
494  * @return {Promise} returns Promise if no callback passed
495  */
496 OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
497   var self = this;
498   if(this.s.executed) throw new toError("batch cannot be re-executed");
499   if(typeof _writeConcern == 'function') {
500     callback = _writeConcern;
501   } else {
502     this.s.writeConcern = _writeConcern;
503   }
504
505   // If we have current batch
506   if(this.s.currentBatch) this.s.batches.push(this.s.currentBatch)
507
508   // If we have no operations in the bulk raise an error
509   if(this.s.batches.length == 0) {
510     throw toError("Invalid Operation, No operations in bulk");
511   }
512
513   // Execute using callback
514   if(typeof callback == 'function') {
515                 return executeCommands(this, callback);
516         }
517
518   // Return a Promise
519   return new this.s.promiseLibrary(function(resolve, reject) {
520     executeCommands(self, function(err, r) {
521       if(err) return reject(err);
522       resolve(r);
523     });
524   });
525 }
526
527 define.classMethod('execute', {callback: true, promise:false});
528
529 /**
530  * Returns an unordered batch object
531  * @ignore
532  */
533 var initializeOrderedBulkOp = function(topology, collection, options) {
534         return new OrderedBulkOperation(topology, collection, options);
535 }
536
537 initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
538 module.exports = initializeOrderedBulkOp;
539 module.exports.Bulk = OrderedBulkOperation;