abfeb975464a5f4a8cb7cbd7728c25a11f177b0c
[aai/esr-gui.git] /
1 "use strict";
2
3 var Insert = require('./commands').Insert
4   , Update = require('./commands').Update
5   , Remove = require('./commands').Remove
6   , Query = require('../connection/commands').Query
7   , copy = require('../connection/utils').copy
8   , KillCursor = require('../connection/commands').KillCursor
9   , GetMore = require('../connection/commands').GetMore
10   , Query = require('../connection/commands').Query
11   , ReadPreference = require('../topologies/read_preference')
12   , f = require('util').format
13   , CommandResult = require('../connection/command_result')
14   , MongoError = require('../error')
15   , Long = require('bson').Long
16   , getReadPreference = require('./shared').getReadPreference;
17
18 // Write concern fields
19 var writeConcernFields = ['w', 'wtimeout', 'j', 'fsync'];
20
21 var WireProtocol = function() {}
22
23 //
24 // Needs to support legacy mass insert as well as ordered/unordered legacy
25 // emulation
26 //
27 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
28   options = options || {};
29   // Default is ordered execution
30   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
31   var legacy = typeof options.legacy == 'boolean' ? options.legacy : false;
32   ops = Array.isArray(ops) ? ops :[ops];
33
34   // If we have more than a 1000 ops fails
35   if(ops.length > 1000) return callback(new MongoError("exceeded maximum write batch size of 1000"));
36
37   // Write concern
38   var writeConcern = options.writeConcern || {w:1};
39
40   // We are unordered
41   if(!ordered || writeConcern.w == 0) {
42     return executeUnordered('insert', Insert, ismaster, ns, bson, pool, ops, options, callback);
43   }
44
45   return executeOrdered('insert', Insert, ismaster, ns, bson, pool, ops, options, callback);
46 }
47
48 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
49   options = options || {};
50   // Default is ordered execution
51   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
52   ops = Array.isArray(ops) ? ops :[ops];
53
54   // Write concern
55   var writeConcern = options.writeConcern || {w:1};
56
57   // We are unordered
58   if(!ordered || writeConcern.w == 0) {
59     return executeUnordered('update', Update, ismaster, ns, bson, pool, ops, options, callback);
60   }
61
62   return executeOrdered('update', Update, ismaster, ns, bson, pool, ops, options, callback);
63 }
64
65 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
66   options = options || {};
67   // Default is ordered execution
68   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
69   ops = Array.isArray(ops) ? ops :[ops];
70
71   // Write concern
72   var writeConcern = options.writeConcern || {w:1};
73
74   // We are unordered
75   if(!ordered || writeConcern.w == 0) {
76     return executeUnordered('remove', Remove, ismaster, ns, bson, pool, ops, options, callback);
77   }
78
79   return executeOrdered('remove', Remove, ismaster, ns, bson, pool, ops, options, callback);
80 }
81
82 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
83   // Create a kill cursor command
84   var killCursor = new KillCursor(bson, [cursorId]);
85   // Execute the kill cursor command
86   if(pool && pool.isConnected()) {
87     pool.write(killCursor, {
88       immediateRelease:true, noResponse: true
89     });
90   }
91
92   // Callback
93   if(typeof callback == 'function') callback(null, null);
94 }
95
96 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
97   // Create getMore command
98   var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
99
100   // Query callback
101   var queryCallback = function(err, result) {
102     if(err) return callback(err);
103     // Get the raw message
104     var r = result.message;
105
106     // If we have a timed out query or a cursor that was killed
107     if((r.responseFlags & (1 << 0)) != 0) {
108       return callback(new MongoError("cursor does not exist, was killed or timed out"), null);
109     }
110
111     // Ensure we have a Long valie cursor id
112     var cursorId = typeof r.cursorId == 'number'
113       ? Long.fromNumber(r.cursorId)
114       : r.cursorId;
115
116     // Set all the values
117     cursorState.documents = r.documents;
118     cursorState.cursorId = cursorId;
119
120     // Return
121     callback(null, null, r.connection);
122   }
123
124   // If we have a raw query decorate the function
125   if(raw) {
126     queryCallback.raw = raw;
127   }
128
129   // Check if we need to promote longs
130   if(typeof cursorState.promoteLongs == 'boolean') {
131     queryCallback.promoteLongs = cursorState.promoteLongs;
132   }
133
134   if(typeof cursorState.promoteValues == 'boolean') {
135     queryCallback.promoteValues = cursorState.promoteValues;
136   }
137
138   if(typeof cursorState.promoteBuffers == 'boolean') {
139     queryCallback.promoteBuffers = cursorState.promoteBuffers;
140   }
141
142   // Write out the getMore command
143   connection.write(getMore, queryCallback);
144 }
145
146 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
147   // Establish type of command
148   if(cmd.find) {
149     return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
150   } else if(cursorState.cursorId != null) {
151   } else if(cmd) {
152     return setupCommand(bson, ns, cmd, cursorState, topology, options);
153   } else {
154     throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
155   }
156 }
157
158 //
159 // Execute a find command
160 var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
161   // Ensure we have at least some options
162   options = options || {};
163   // Get the readPreference
164   var readPreference = getReadPreference(cmd, options);
165   // Set the optional batchSize
166   cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
167   var numberToReturn = 0;
168
169   // Unpack the limit and batchSize values
170   if(cursorState.limit == 0) {
171     numberToReturn = cursorState.batchSize;
172   } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
173     numberToReturn = cursorState.limit;
174   } else {
175     numberToReturn = cursorState.batchSize;
176   }
177
178   var numberToSkip = cursorState.skip || 0;
179   // Build actual find command
180   var findCmd = {};
181   // Using special modifier
182   var usesSpecialModifier = false;
183
184   // We have a Mongos topology, check if we need to add a readPreference
185   if(topology.type == 'mongos' && readPreference) {
186     findCmd['$readPreference'] = readPreference.toJSON();
187     usesSpecialModifier = true;
188   }
189
190   // Add special modifiers to the query
191   if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
192   if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
193   if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
194   if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
195   if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
196   if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
197   if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
198   if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
199   if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
200   if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
201
202   if(cmd.explain) {
203         // nToReturn must be 0 (match all) or negative (match N and close cursor)
204         // nToReturn > 0 will give explain results equivalent to limit(0)
205     numberToReturn = -Math.abs(cmd.limit || 0);
206     usesSpecialModifier = true;
207     findCmd['$explain'] = true;
208   }
209
210   // If we have a special modifier
211   if(usesSpecialModifier) {
212     findCmd['$query'] = cmd.query;
213   } else {
214     findCmd = cmd.query;
215   }
216
217   // Throw on majority readConcern passed in
218   if(cmd.readConcern && cmd.readConcern.level != 'local') {
219     throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
220   }
221
222   // Remove readConcern, ensure no failing commands
223   if(cmd.readConcern) {
224     cmd = copy(cmd);
225     delete cmd['readConcern'];
226   }
227
228   // Set up the serialize and ignoreUndefined fields
229   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
230     ? options.serializeFunctions : false;
231   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
232     ? options.ignoreUndefined : false;
233
234   // Build Query object
235   var query = new Query(bson, ns, findCmd, {
236       numberToSkip: numberToSkip, numberToReturn: numberToReturn
237     , checkKeys: false, returnFieldSelector: cmd.fields
238     , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
239   });
240
241   // Set query flags
242   query.slaveOk = readPreference.slaveOk();
243
244   // Set up the option bits for wire protocol
245   if(typeof cmd.tailable == 'boolean') query.tailable = cmd.tailable;
246   if(typeof cmd.oplogReplay == 'boolean') query.oplogReplay = cmd.oplogReplay;
247   if(typeof cmd.noCursorTimeout == 'boolean') query.noCursorTimeout = cmd.noCursorTimeout;
248   if(typeof cmd.awaitData == 'boolean') query.awaitData = cmd.awaitData;
249   if(typeof cmd.partial == 'boolean') query.partial = cmd.partial;
250   // Return the query
251   return query;
252 }
253
254 //
255 // Set up a command cursor
256 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
257   // Set empty options object
258   options = options || {}
259   // Get the readPreference
260   var readPreference = getReadPreference(cmd, options);
261   // Final query
262   var finalCmd = {};
263   for(var name in cmd) {
264     finalCmd[name] = cmd[name];
265   }
266
267   // Build command namespace
268   var parts = ns.split(/\./);
269
270   // Throw on majority readConcern passed in
271   if(cmd.readConcern && cmd.readConcern.level != 'local') {
272     throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
273   }
274
275   // Remove readConcern, ensure no failing commands
276   if(cmd.readConcern) delete cmd['readConcern'];
277
278   // Serialize functions
279   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
280     ? options.serializeFunctions : false;
281
282   // Set up the serialize and ignoreUndefined fields
283   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
284     ? options.ignoreUndefined : false;
285
286   // We have a Mongos topology, check if we need to add a readPreference
287   if(topology.type == 'mongos'
288     && readPreference
289     && readPreference.preference != 'primary') {
290     finalCmd = {
291       '$query': finalCmd,
292       '$readPreference': readPreference.toJSON()
293     };
294   }
295
296   // Build Query object
297   var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
298       numberToSkip: 0, numberToReturn: -1
299     , checkKeys: false, serializeFunctions: serializeFunctions
300     , ignoreUndefined: ignoreUndefined
301   });
302
303   // Set query flags
304   query.slaveOk = readPreference.slaveOk();
305
306   // Return the query
307   return query;
308 }
309
310 var hasWriteConcern = function(writeConcern) {
311   if(writeConcern.w
312     || writeConcern.wtimeout
313     || writeConcern.j == true
314     || writeConcern.fsync == true
315     || Object.keys(writeConcern).length == 0) {
316     return true;
317   }
318   return false;
319 }
320
321 var cloneWriteConcern = function(writeConcern) {
322   var wc = {};
323   if(writeConcern.w != null) wc.w = writeConcern.w;
324   if(writeConcern.wtimeout != null) wc.wtimeout = writeConcern.wtimeout;
325   if(writeConcern.j != null) wc.j = writeConcern.j;
326   if(writeConcern.fsync != null) wc.fsync = writeConcern.fsync;
327   return wc;
328 }
329
330 //
331 // Aggregate up all the results
332 //
333 var aggregateWriteOperationResults = function(opType, ops, results, connection) {
334   var finalResult = { ok: 1, n: 0 }
335   if(opType == 'update') {
336     finalResult.nModified = 0;
337     n: 0;
338   };
339
340   // Map all the results coming back
341   for(var i = 0; i < results.length; i++) {
342     var result = results[i];
343     var op = ops[i];
344
345     if((result.upserted || (result.updatedExisting == false)) && finalResult.upserted == null) {
346       finalResult.upserted = [];
347     }
348
349     // Push the upserted document to the list of upserted values
350     if(result.upserted) {
351       finalResult.upserted.push({index: i, _id: result.upserted});
352     }
353
354     // We have an upsert where we passed in a _id
355     if(result.updatedExisting == false && result.n == 1 && result.upserted == null) {
356       finalResult.upserted.push({index: i, _id: op.q._id});
357     } else if(result.updatedExisting == true) {
358       finalResult.nModified += result.n;
359     }
360
361     // We have an insert command
362     if(result.ok == 1 && opType == 'insert' && result.err == null) {
363       finalResult.n = finalResult.n + 1;
364     }
365
366     // We have a command error
367     if(result != null && result.ok == 0 || result.err || result.errmsg) {
368       if(result.ok == 0) finalResult.ok = 0;
369       finalResult.code = result.code;
370       finalResult.errmsg = result.errmsg || result.err || result.errMsg;
371
372       // Check if we have a write error
373       if(result.code == 11000
374         || result.code == 11001
375         || result.code == 12582
376         || result.code == 16544
377         || result.code == 16538
378         || result.code == 16542
379         || result.code == 14
380         || result.code == 13511) {
381         if(finalResult.writeErrors == null) finalResult.writeErrors = [];
382         finalResult.writeErrors.push({
383             index: i
384           , code: result.code
385           , errmsg: result.errmsg || result.err || result.errMsg
386         });
387       } else {
388         finalResult.writeConcernError = {
389             code: result.code
390           , errmsg: result.errmsg || result.err || result.errMsg
391         }
392       }
393     } else if(typeof result.n == 'number') {
394       finalResult.n += result.n;
395     } else {
396       finalResult.n += 1;
397     }
398
399     // Result as expected
400     if(result != null && result.lastOp) finalResult.lastOp = result.lastOp;
401   }
402
403   // Return finalResult aggregated results
404   return new CommandResult(finalResult, connection);
405 }
406
407 //
408 // Execute all inserts in an ordered manner
409 //
410 var executeOrdered = function(opType ,command, ismaster, ns, bson, pool, ops, options, callback) {
411   var _ops = ops.slice(0);
412   // Collect all the getLastErrors
413   var getLastErrors = [];
414   // Execute an operation
415   var executeOp = function(list, _callback) {
416     // No more items in the list
417     if(list.length == 0) {
418       return process.nextTick(function() {
419         _callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, null));
420       });
421     }
422
423     // Get the first operation
424     var doc = list.shift();
425     // Create an insert command
426     var op = new command(Query.getRequestId(), ismaster, bson, ns, [doc], options);
427     // Write concern
428     var optionWriteConcern = options.writeConcern || {w:1};
429     // Final write concern
430     var writeConcern = cloneWriteConcern(optionWriteConcern);
431
432     // Get the db name
433     var db = ns.split('.').shift();
434
435     try {
436       // Add binary message to list of commands to execute
437       var commands = [op];
438
439       // Add getLastOrdered
440       var getLastErrorCmd = {getlasterror: 1};
441       // Merge all the fields
442       for(var i = 0; i < writeConcernFields.length; i++) {
443         if(writeConcern[writeConcernFields[i]] != null) {
444           getLastErrorCmd[writeConcernFields[i]] = writeConcern[writeConcernFields[i]];
445         }
446       }
447
448       // Create a getLastError command
449       var getLastErrorOp = new Query(bson, f("%s.$cmd", db), getLastErrorCmd, {numberToReturn: -1});
450       // Add getLastError command to list of ops to execute
451       commands.push(getLastErrorOp);
452
453       // getLastError callback
454       var getLastErrorCallback = function(err, result) {
455         if(err) return callback(err);
456         // Get the document
457         var doc = result.result;
458         // Save the getLastError document
459         getLastErrors.push(doc);
460
461         // If we have an error terminate
462         if(doc.ok == 0 || doc.err || doc.errmsg) {
463           return callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, result.connection));
464         }
465
466         // Execute the next op in the list
467         executeOp(list, callback);
468       }
469
470       // Write both commands out at the same time
471       pool.write(commands, getLastErrorCallback);
472     } catch(err) {
473       if(typeof err == 'string') err = new MongoError(err);
474       // We have a serialization error, rewrite as a write error to have same behavior as modern
475       // write commands
476       getLastErrors.push({ ok: 1, errmsg: err.message, code: 14 });
477       // Return due to an error
478       process.nextTick(function() {
479         _callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, null));
480       });
481     }
482   }
483
484   // Execute the operations
485   executeOp(_ops, callback);
486 }
487
488 var executeUnordered = function(opType, command, ismaster, ns, bson, pool, ops, options, callback) {
489   // Total operations to write
490   var totalOps = ops.length;
491   // Collect all the getLastErrors
492   var getLastErrors = [];
493   // Write concern
494   var optionWriteConcern = options.writeConcern || {w:1};
495   // Final write concern
496   var writeConcern = cloneWriteConcern(optionWriteConcern);
497   // Driver level error
498   var error;
499
500   // Execute all the operations
501   for(var i = 0; i < ops.length; i++) {
502     // Create an insert command
503     var op = new command(Query.getRequestId(), ismaster, bson, ns, [ops[i]], options);
504     // Get db name
505     var db = ns.split('.').shift();
506
507     try {
508       // Add binary message to list of commands to execute
509       var commands = [op];
510
511       // If write concern 0 don't fire getLastError
512       if(hasWriteConcern(writeConcern)) {
513         var getLastErrorCmd = {getlasterror: 1};
514         // Merge all the fields
515         for(var j = 0; j < writeConcernFields.length; j++) {
516           if(writeConcern[writeConcernFields[j]] != null)
517             getLastErrorCmd[writeConcernFields[j]] = writeConcern[writeConcernFields[j]];
518         }
519
520         // Create a getLastError command
521         var getLastErrorOp = new Query(bson, f("%s.$cmd", db), getLastErrorCmd, {numberToReturn: -1});
522         // Add getLastError command to list of ops to execute
523         commands.push(getLastErrorOp);
524
525         // Give the result from getLastError the right index
526         var callbackOp = function(_index) {
527           return function(err, result) {
528             if(err) error = err;
529             // Update the number of operations executed
530             totalOps = totalOps - 1;
531             // Save the getLastError document
532             if(!err) getLastErrors[_index] = result.result;
533             // Check if we are done
534             if(totalOps == 0) {
535               process.nextTick(function() {
536                 if(error) return callback(error);
537                 callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, result.connection));
538               });
539             }
540           }
541         }
542
543         // Write both commands out at the same time
544         pool.write(commands, callbackOp(i));
545       } else {
546         pool.write(commands, {immediateRelease:true, noResponse:true});
547       }
548     } catch(err) {
549       if(typeof err == 'string') err = new MongoError(err);
550       // Update the number of operations executed
551       totalOps = totalOps - 1;
552       // We have a serialization error, rewrite as a write error to have same behavior as modern
553       // write commands
554       getLastErrors[i] = { ok: 1, errmsg: err.message, code: 14 };
555       // Check if we are done
556       if(totalOps == 0) {
557         callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, null));
558       }
559     }
560   }
561
562   // Empty w:0 return
563   if(writeConcern
564     && writeConcern.w == 0 && callback) {
565     callback(null, new CommandResult({ok:1}, null));
566   }
567 }
568
569 module.exports = WireProtocol;