3 var Insert = require('./commands').Insert
4 , Update = require('./commands').Update
5 , Remove = require('./commands').Remove
6 , Query = require('../connection/commands').Query
7 , copy = require('../connection/utils').copy
8 , KillCursor = require('../connection/commands').KillCursor
9 , GetMore = require('../connection/commands').GetMore
10 , Query = require('../connection/commands').Query
11 , ReadPreference = require('../topologies/read_preference')
12 , f = require('util').format
13 , CommandResult = require('../connection/command_result')
14 , MongoError = require('../error')
15 , Long = require('bson').Long
16 , getReadPreference = require('./shared').getReadPreference;
18 // Write concern fields
19 var writeConcernFields = ['w', 'wtimeout', 'j', 'fsync'];
21 var WireProtocol = function() {}
24 // Needs to support legacy mass insert as well as ordered/unordered legacy
27 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
28 options = options || {};
29 // Default is ordered execution
30 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
31 var legacy = typeof options.legacy == 'boolean' ? options.legacy : false;
32 ops = Array.isArray(ops) ? ops :[ops];
34 // If we have more than a 1000 ops fails
35 if(ops.length > 1000) return callback(new MongoError("exceeded maximum write batch size of 1000"));
38 var writeConcern = options.writeConcern || {w:1};
41 if(!ordered || writeConcern.w == 0) {
42 return executeUnordered('insert', Insert, ismaster, ns, bson, pool, ops, options, callback);
45 return executeOrdered('insert', Insert, ismaster, ns, bson, pool, ops, options, callback);
48 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
49 options = options || {};
50 // Default is ordered execution
51 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
52 ops = Array.isArray(ops) ? ops :[ops];
55 var writeConcern = options.writeConcern || {w:1};
58 if(!ordered || writeConcern.w == 0) {
59 return executeUnordered('update', Update, ismaster, ns, bson, pool, ops, options, callback);
62 return executeOrdered('update', Update, ismaster, ns, bson, pool, ops, options, callback);
65 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
66 options = options || {};
67 // Default is ordered execution
68 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
69 ops = Array.isArray(ops) ? ops :[ops];
72 var writeConcern = options.writeConcern || {w:1};
75 if(!ordered || writeConcern.w == 0) {
76 return executeUnordered('remove', Remove, ismaster, ns, bson, pool, ops, options, callback);
79 return executeOrdered('remove', Remove, ismaster, ns, bson, pool, ops, options, callback);
82 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
83 // Create a kill cursor command
84 var killCursor = new KillCursor(bson, [cursorId]);
85 // Execute the kill cursor command
86 if(pool && pool.isConnected()) {
87 pool.write(killCursor, {
88 immediateRelease:true, noResponse: true
93 if(typeof callback == 'function') callback(null, null);
96 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
97 // Create getMore command
98 var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
101 var queryCallback = function(err, result) {
102 if(err) return callback(err);
103 // Get the raw message
104 var r = result.message;
106 // If we have a timed out query or a cursor that was killed
107 if((r.responseFlags & (1 << 0)) != 0) {
108 return callback(new MongoError("cursor does not exist, was killed or timed out"), null);
111 // Ensure we have a Long valie cursor id
112 var cursorId = typeof r.cursorId == 'number'
113 ? Long.fromNumber(r.cursorId)
116 // Set all the values
117 cursorState.documents = r.documents;
118 cursorState.cursorId = cursorId;
121 callback(null, null, r.connection);
124 // If we have a raw query decorate the function
126 queryCallback.raw = raw;
129 // Check if we need to promote longs
130 if(typeof cursorState.promoteLongs == 'boolean') {
131 queryCallback.promoteLongs = cursorState.promoteLongs;
134 if(typeof cursorState.promoteValues == 'boolean') {
135 queryCallback.promoteValues = cursorState.promoteValues;
138 if(typeof cursorState.promoteBuffers == 'boolean') {
139 queryCallback.promoteBuffers = cursorState.promoteBuffers;
142 // Write out the getMore command
143 connection.write(getMore, queryCallback);
146 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
147 // Establish type of command
149 return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
150 } else if(cursorState.cursorId != null) {
152 return setupCommand(bson, ns, cmd, cursorState, topology, options);
154 throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
159 // Execute a find command
160 var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
161 // Ensure we have at least some options
162 options = options || {};
163 // Get the readPreference
164 var readPreference = getReadPreference(cmd, options);
165 // Set the optional batchSize
166 cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
167 var numberToReturn = 0;
169 // Unpack the limit and batchSize values
170 if(cursorState.limit == 0) {
171 numberToReturn = cursorState.batchSize;
172 } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
173 numberToReturn = cursorState.limit;
175 numberToReturn = cursorState.batchSize;
178 var numberToSkip = cursorState.skip || 0;
179 // Build actual find command
181 // Using special modifier
182 var usesSpecialModifier = false;
184 // We have a Mongos topology, check if we need to add a readPreference
185 if(topology.type == 'mongos' && readPreference) {
186 findCmd['$readPreference'] = readPreference.toJSON();
187 usesSpecialModifier = true;
190 // Add special modifiers to the query
191 if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
192 if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
193 if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
194 if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
195 if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
196 if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
197 if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
198 if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
199 if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
200 if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
203 // nToReturn must be 0 (match all) or negative (match N and close cursor)
204 // nToReturn > 0 will give explain results equivalent to limit(0)
205 numberToReturn = -Math.abs(cmd.limit || 0);
206 usesSpecialModifier = true;
207 findCmd['$explain'] = true;
210 // If we have a special modifier
211 if(usesSpecialModifier) {
212 findCmd['$query'] = cmd.query;
217 // Throw on majority readConcern passed in
218 if(cmd.readConcern && cmd.readConcern.level != 'local') {
219 throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
222 // Remove readConcern, ensure no failing commands
223 if(cmd.readConcern) {
225 delete cmd['readConcern'];
228 // Set up the serialize and ignoreUndefined fields
229 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
230 ? options.serializeFunctions : false;
231 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
232 ? options.ignoreUndefined : false;
234 // Build Query object
235 var query = new Query(bson, ns, findCmd, {
236 numberToSkip: numberToSkip, numberToReturn: numberToReturn
237 , checkKeys: false, returnFieldSelector: cmd.fields
238 , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
242 query.slaveOk = readPreference.slaveOk();
244 // Set up the option bits for wire protocol
245 if(typeof cmd.tailable == 'boolean') query.tailable = cmd.tailable;
246 if(typeof cmd.oplogReplay == 'boolean') query.oplogReplay = cmd.oplogReplay;
247 if(typeof cmd.noCursorTimeout == 'boolean') query.noCursorTimeout = cmd.noCursorTimeout;
248 if(typeof cmd.awaitData == 'boolean') query.awaitData = cmd.awaitData;
249 if(typeof cmd.partial == 'boolean') query.partial = cmd.partial;
255 // Set up a command cursor
256 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
257 // Set empty options object
258 options = options || {}
259 // Get the readPreference
260 var readPreference = getReadPreference(cmd, options);
263 for(var name in cmd) {
264 finalCmd[name] = cmd[name];
267 // Build command namespace
268 var parts = ns.split(/\./);
270 // Throw on majority readConcern passed in
271 if(cmd.readConcern && cmd.readConcern.level != 'local') {
272 throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
275 // Remove readConcern, ensure no failing commands
276 if(cmd.readConcern) delete cmd['readConcern'];
278 // Serialize functions
279 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
280 ? options.serializeFunctions : false;
282 // Set up the serialize and ignoreUndefined fields
283 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
284 ? options.ignoreUndefined : false;
286 // We have a Mongos topology, check if we need to add a readPreference
287 if(topology.type == 'mongos'
289 && readPreference.preference != 'primary') {
292 '$readPreference': readPreference.toJSON()
296 // Build Query object
297 var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
298 numberToSkip: 0, numberToReturn: -1
299 , checkKeys: false, serializeFunctions: serializeFunctions
300 , ignoreUndefined: ignoreUndefined
304 query.slaveOk = readPreference.slaveOk();
310 var hasWriteConcern = function(writeConcern) {
312 || writeConcern.wtimeout
313 || writeConcern.j == true
314 || writeConcern.fsync == true
315 || Object.keys(writeConcern).length == 0) {
321 var cloneWriteConcern = function(writeConcern) {
323 if(writeConcern.w != null) wc.w = writeConcern.w;
324 if(writeConcern.wtimeout != null) wc.wtimeout = writeConcern.wtimeout;
325 if(writeConcern.j != null) wc.j = writeConcern.j;
326 if(writeConcern.fsync != null) wc.fsync = writeConcern.fsync;
331 // Aggregate up all the results
333 var aggregateWriteOperationResults = function(opType, ops, results, connection) {
334 var finalResult = { ok: 1, n: 0 }
335 if(opType == 'update') {
336 finalResult.nModified = 0;
340 // Map all the results coming back
341 for(var i = 0; i < results.length; i++) {
342 var result = results[i];
345 if((result.upserted || (result.updatedExisting == false)) && finalResult.upserted == null) {
346 finalResult.upserted = [];
349 // Push the upserted document to the list of upserted values
350 if(result.upserted) {
351 finalResult.upserted.push({index: i, _id: result.upserted});
354 // We have an upsert where we passed in a _id
355 if(result.updatedExisting == false && result.n == 1 && result.upserted == null) {
356 finalResult.upserted.push({index: i, _id: op.q._id});
357 } else if(result.updatedExisting == true) {
358 finalResult.nModified += result.n;
361 // We have an insert command
362 if(result.ok == 1 && opType == 'insert' && result.err == null) {
363 finalResult.n = finalResult.n + 1;
366 // We have a command error
367 if(result != null && result.ok == 0 || result.err || result.errmsg) {
368 if(result.ok == 0) finalResult.ok = 0;
369 finalResult.code = result.code;
370 finalResult.errmsg = result.errmsg || result.err || result.errMsg;
372 // Check if we have a write error
373 if(result.code == 11000
374 || result.code == 11001
375 || result.code == 12582
376 || result.code == 16544
377 || result.code == 16538
378 || result.code == 16542
380 || result.code == 13511) {
381 if(finalResult.writeErrors == null) finalResult.writeErrors = [];
382 finalResult.writeErrors.push({
385 , errmsg: result.errmsg || result.err || result.errMsg
388 finalResult.writeConcernError = {
390 , errmsg: result.errmsg || result.err || result.errMsg
393 } else if(typeof result.n == 'number') {
394 finalResult.n += result.n;
399 // Result as expected
400 if(result != null && result.lastOp) finalResult.lastOp = result.lastOp;
403 // Return finalResult aggregated results
404 return new CommandResult(finalResult, connection);
408 // Execute all inserts in an ordered manner
410 var executeOrdered = function(opType ,command, ismaster, ns, bson, pool, ops, options, callback) {
411 var _ops = ops.slice(0);
412 // Collect all the getLastErrors
413 var getLastErrors = [];
414 // Execute an operation
415 var executeOp = function(list, _callback) {
416 // No more items in the list
417 if(list.length == 0) {
418 return process.nextTick(function() {
419 _callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, null));
423 // Get the first operation
424 var doc = list.shift();
425 // Create an insert command
426 var op = new command(Query.getRequestId(), ismaster, bson, ns, [doc], options);
428 var optionWriteConcern = options.writeConcern || {w:1};
429 // Final write concern
430 var writeConcern = cloneWriteConcern(optionWriteConcern);
433 var db = ns.split('.').shift();
436 // Add binary message to list of commands to execute
439 // Add getLastOrdered
440 var getLastErrorCmd = {getlasterror: 1};
441 // Merge all the fields
442 for(var i = 0; i < writeConcernFields.length; i++) {
443 if(writeConcern[writeConcernFields[i]] != null) {
444 getLastErrorCmd[writeConcernFields[i]] = writeConcern[writeConcernFields[i]];
448 // Create a getLastError command
449 var getLastErrorOp = new Query(bson, f("%s.$cmd", db), getLastErrorCmd, {numberToReturn: -1});
450 // Add getLastError command to list of ops to execute
451 commands.push(getLastErrorOp);
453 // getLastError callback
454 var getLastErrorCallback = function(err, result) {
455 if(err) return callback(err);
457 var doc = result.result;
458 // Save the getLastError document
459 getLastErrors.push(doc);
461 // If we have an error terminate
462 if(doc.ok == 0 || doc.err || doc.errmsg) {
463 return callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, result.connection));
466 // Execute the next op in the list
467 executeOp(list, callback);
470 // Write both commands out at the same time
471 pool.write(commands, getLastErrorCallback);
473 if(typeof err == 'string') err = new MongoError(err);
474 // We have a serialization error, rewrite as a write error to have same behavior as modern
476 getLastErrors.push({ ok: 1, errmsg: err.message, code: 14 });
477 // Return due to an error
478 process.nextTick(function() {
479 _callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, null));
484 // Execute the operations
485 executeOp(_ops, callback);
488 var executeUnordered = function(opType, command, ismaster, ns, bson, pool, ops, options, callback) {
489 // Total operations to write
490 var totalOps = ops.length;
491 // Collect all the getLastErrors
492 var getLastErrors = [];
494 var optionWriteConcern = options.writeConcern || {w:1};
495 // Final write concern
496 var writeConcern = cloneWriteConcern(optionWriteConcern);
497 // Driver level error
500 // Execute all the operations
501 for(var i = 0; i < ops.length; i++) {
502 // Create an insert command
503 var op = new command(Query.getRequestId(), ismaster, bson, ns, [ops[i]], options);
505 var db = ns.split('.').shift();
508 // Add binary message to list of commands to execute
511 // If write concern 0 don't fire getLastError
512 if(hasWriteConcern(writeConcern)) {
513 var getLastErrorCmd = {getlasterror: 1};
514 // Merge all the fields
515 for(var j = 0; j < writeConcernFields.length; j++) {
516 if(writeConcern[writeConcernFields[j]] != null)
517 getLastErrorCmd[writeConcernFields[j]] = writeConcern[writeConcernFields[j]];
520 // Create a getLastError command
521 var getLastErrorOp = new Query(bson, f("%s.$cmd", db), getLastErrorCmd, {numberToReturn: -1});
522 // Add getLastError command to list of ops to execute
523 commands.push(getLastErrorOp);
525 // Give the result from getLastError the right index
526 var callbackOp = function(_index) {
527 return function(err, result) {
529 // Update the number of operations executed
530 totalOps = totalOps - 1;
531 // Save the getLastError document
532 if(!err) getLastErrors[_index] = result.result;
533 // Check if we are done
535 process.nextTick(function() {
536 if(error) return callback(error);
537 callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, result.connection));
543 // Write both commands out at the same time
544 pool.write(commands, callbackOp(i));
546 pool.write(commands, {immediateRelease:true, noResponse:true});
549 if(typeof err == 'string') err = new MongoError(err);
550 // Update the number of operations executed
551 totalOps = totalOps - 1;
552 // We have a serialization error, rewrite as a write error to have same behavior as modern
554 getLastErrors[i] = { ok: 1, errmsg: err.message, code: 14 };
555 // Check if we are done
557 callback(null, aggregateWriteOperationResults(opType, ops, getLastErrors, null));
564 && writeConcern.w == 0 && callback) {
565 callback(null, new CommandResult({ok:1}, null));
569 module.exports = WireProtocol;