3 var Insert = require('./commands').Insert
4 , Update = require('./commands').Update
5 , Remove = require('./commands').Remove
6 , Query = require('../connection/commands').Query
7 , copy = require('../connection/utils').copy
8 , KillCursor = require('../connection/commands').KillCursor
9 , GetMore = require('../connection/commands').GetMore
10 , Query = require('../connection/commands').Query
11 , ReadPreference = require('../topologies/read_preference')
12 , f = require('util').format
13 , CommandResult = require('../connection/command_result')
14 , MongoError = require('../error')
15 , Long = require('bson').Long
16 , getReadPreference = require('./shared').getReadPreference;
18 var WireProtocol = function(legacyWireProtocol) {
19 this.legacyWireProtocol = legacyWireProtocol;
23 // Execute a write operation
24 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
25 if(ops.length == 0) throw new MongoError("insert must contain at least one document");
26 if(typeof options == 'function') {
29 options = options || {};
32 // Split the ns up to get db and collection
33 var p = ns.split(".");
36 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
37 var writeConcern = options.writeConcern;
40 var writeCommand = {};
41 writeCommand[type] = p.join('.');
42 writeCommand[opsField] = ops;
43 writeCommand.ordered = ordered;
45 // Did we specify a write concern
46 if(writeConcern && Object.keys(writeConcern).length > 0) {
47 writeCommand.writeConcern = writeConcern;
50 // If we have collation passed in
51 if(options.collation) {
52 for(var i = 0; i < writeCommand[opsField].length; i++) {
53 if(!writeCommand[opsField][i].collation) {
54 writeCommand[opsField][i].collation = options.collation;
59 // Do we have bypassDocumentValidation set, then enable it on the write command
60 if(typeof options.bypassDocumentValidation == 'boolean') {
61 writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
65 var opts = { command: true };
66 var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
67 if(type == 'insert') queryOptions.checkKeys = true;
69 // Ensure we support serialization of functions
70 if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
71 // Do not serialize the undefined fields
72 if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
75 // Create write command
76 var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
78 pool.write(cmd, opts, callback);
85 // Needs to support legacy mass insert as well as ordered/unordered legacy
88 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
89 executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
92 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
93 executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
96 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
97 executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
100 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
101 // Build command namespace
102 var parts = ns.split(/\./);
104 var commandns = f('%s.$cmd', parts.shift());
105 // Create getMore command
106 var killcursorCmd = {
107 killCursors: parts.join('.'),
111 // Build Query object
112 var query = new Query(bson, commandns, killcursorCmd, {
113 numberToSkip: 0, numberToReturn: -1
114 , checkKeys: false, returnFieldSelector: null
118 query.slaveOk = true;
120 // Kill cursor callback
121 var killCursorCallback = function(err, result) {
123 if(typeof callback != 'function') return;
124 return callback(err);
128 var r = result.message;
129 // If we have a timed out query or a cursor that was killed
130 if((r.responseFlags & (1 << 0)) != 0) {
131 if(typeof callback != 'function') return;
132 return callback(new MongoError("cursor killed or timed out"), null);
135 if(!Array.isArray(r.documents) || r.documents.length == 0) {
136 if(typeof callback != 'function') return;
137 return callback(new MongoError(f('invalid killCursors result returned for cursor id %s', cursorState.cursorId)));
141 if(typeof callback == 'function') {
142 callback(null, r.documents[0]);
146 // Execute the kill cursor command
147 if(pool && pool.isConnected()) {
150 }, killCursorCallback);
154 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
155 options = options || {};
156 // Build command namespace
157 var parts = ns.split(/\./);
159 var commandns = f('%s.$cmd', parts.shift());
161 // Check if we have an maxTimeMS set
162 var maxTimeMS = typeof cursorState.cmd.maxTimeMS == 'number' ? cursorState.cmd.maxTimeMS : 3000;
164 // Create getMore command
166 getMore: cursorState.cursorId,
167 collection: parts.join('.'),
168 batchSize: Math.abs(batchSize)
171 if(cursorState.cmd.tailable
172 && typeof cursorState.cmd.maxAwaitTimeMS == 'number') {
173 getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS;
176 // Build Query object
177 var query = new Query(bson, commandns, getMoreCmd, {
178 numberToSkip: 0, numberToReturn: -1
179 , checkKeys: false, returnFieldSelector: null
183 query.slaveOk = true;
186 var queryCallback = function(err, result) {
187 if(err) return callback(err);
188 // Get the raw message
189 var r = result.message;
191 // If we have a timed out query or a cursor that was killed
192 if((r.responseFlags & (1 << 0)) != 0) {
193 return callback(new MongoError("cursor killed or timed out"), null);
196 // Raw, return all the extracted documents
198 cursorState.documents = r.documents;
199 cursorState.cursorId = r.cursorId;
200 return callback(null, r.documents);
203 // We have an error detected
204 if(r.documents[0].ok == 0) {
205 return callback(MongoError.create(r.documents[0]));
208 // Ensure we have a Long valid cursor id
209 var cursorId = typeof r.documents[0].cursor.id == 'number'
210 ? Long.fromNumber(r.documents[0].cursor.id)
211 : r.documents[0].cursor.id;
213 // Set all the values
214 cursorState.documents = r.documents[0].cursor.nextBatch;
215 cursorState.cursorId = cursorId;
218 callback(null, r.documents[0], r.connection);
222 var queryOptions = { command: true };
224 // If we have a raw query decorate the function
226 queryOptions.raw = raw;
229 // Add the result field needed
230 queryOptions.documentsReturnedIn = 'nextBatch';
232 // Check if we need to promote longs
233 if(typeof cursorState.promoteLongs == 'boolean') {
234 queryOptions.promoteLongs = cursorState.promoteLongs;
237 if(typeof cursorState.promoteValues == 'boolean') {
238 queryCallback.promoteValues = cursorState.promoteValues;
241 if(typeof cursorState.promoteBuffers == 'boolean') {
242 queryCallback.promoteBuffers = cursorState.promoteBuffers;
245 // Write out the getMore command
246 connection.write(query, queryOptions, queryCallback);
249 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
250 // Establish type of command
252 // Create the find command
253 var query = executeFindCommand(bson, ns, cmd, cursorState, topology, options)
254 // Mark the cmd as virtual
256 // Signal the documents are in the firstBatch value
257 query.documentsReturnedIn = 'firstBatch';
260 } else if(cursorState.cursorId != null) {
262 return setupCommand(bson, ns, cmd, cursorState, topology, options);
264 throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
273 // , fields: <object>
276 // , explain: <boolean>
277 // , snapshot: <boolean>
279 // , returnKey: <boolean>
283 // , showDiskLoc: <boolean>
284 // , comment: <string>
287 // , readPreference: <ReadPreference>
288 // , tailable: <boolean>
289 // , oplogReplay: <boolean>
290 // , noCursorTimeout: <boolean>
291 // , awaitdata: <boolean>
292 // , exhaust: <boolean>
293 // , partial: <boolean>
299 // “filter”: { ... },
301 // “projection”: { ... },
305 // “batchSize”: <int>,
306 // “singleBatch”: <bool>,
307 // “comment”: <string>,
309 // “maxTimeMS”: <int>,
312 // “returnKey”: <bool>,
313 // “showRecordId”: <bool>,
314 // “snapshot”: <bool>,
315 // “tailable”: <bool>,
316 // “oplogReplay”: <bool>,
317 // “noCursorTimeout”: <bool>,
318 // “awaitData”: <bool>,
319 // “partial”: <bool>,
320 // “$readPreference”: { ... }
324 // Execute a find command
325 var executeFindCommand = function(bson, ns, cmd, cursorState, topology, options) {
326 // Ensure we have at least some options
327 options = options || {};
328 // Get the readPreference
329 var readPreference = getReadPreference(cmd, options);
330 // Set the optional batchSize
331 cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
333 // Build command namespace
334 var parts = ns.split(/\./);
336 var commandns = f('%s.$cmd', parts.shift());
338 // Build actual find command
340 find: parts.join('.')
343 // I we provided a filter
345 // Check if the user is passing in the $query parameter
346 if(cmd.query['$query']) {
347 findCmd.filter = cmd.query['$query'];
349 findCmd.filter = cmd.query;
354 var sortValue = cmd.sort;
356 // Handle issue of sort being an Array
357 if(Array.isArray(sortValue)) {
360 if(sortValue.length > 0 && !Array.isArray(sortValue[0])) {
361 var sortDirection = sortValue[1];
362 // Translate the sort order text
363 if(sortDirection == 'asc') {
365 } else if(sortDirection == 'desc') {
369 // Set the sort order
370 sortObject[sortValue[0]] = sortDirection;
372 for(var i = 0; i < sortValue.length; i++) {
373 var sortDirection = sortValue[i][1];
374 // Translate the sort order text
375 if(sortDirection == 'asc') {
377 } else if(sortDirection == 'desc') {
381 // Set the sort order
382 sortObject[sortValue[i][0]] = sortDirection;
386 sortValue = sortObject;
389 // Add sort to command
390 if(cmd.sort) findCmd.sort = sortValue;
391 // Add a projection to the command
392 if(cmd.fields) findCmd.projection = cmd.fields;
393 // Add a hint to the command
394 if(cmd.hint) findCmd.hint = cmd.hint;
396 if(cmd.skip) findCmd.skip = cmd.skip;
398 if(cmd.limit) findCmd.limit = cmd.limit;
400 if(typeof cmd.batchSize == 'number') findCmd.batchSize = Math.abs(cmd.batchSize);
402 // Check if we wish to have a singleBatch
404 findCmd.limit = Math.abs(cmd.limit);
405 findCmd.singleBatch = true;
408 // If we have comment set
409 if(cmd.comment) findCmd.comment = cmd.comment;
411 // If we have maxScan
412 if(cmd.maxScan) findCmd.maxScan = cmd.maxScan;
414 // If we have maxTimeMS set
415 if(cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS;
418 if(cmd.min) findCmd.min = cmd.min;
421 if(cmd.max) findCmd.max = cmd.max;
423 // If we have returnKey set
424 if(cmd.returnKey) findCmd.returnKey = cmd.returnKey;
426 // If we have showDiskLoc set
427 if(cmd.showDiskLoc) findCmd.showRecordId = cmd.showDiskLoc;
429 // If we have snapshot set
430 if(cmd.snapshot) findCmd.snapshot = cmd.snapshot;
432 // If we have tailable set
433 if(cmd.tailable) findCmd.tailable = cmd.tailable;
435 // If we have oplogReplay set
436 if(cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay;
438 // If we have noCursorTimeout set
439 if(cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout;
441 // If we have awaitData set
442 if(cmd.awaitData) findCmd.awaitData = cmd.awaitData;
443 if(cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;
445 // If we have partial set
446 if(cmd.partial) findCmd.partial = cmd.partial;
448 // If we have collation passed in
449 if(cmd.collation) findCmd.collation = cmd.collation;
451 // If we have explain, we need to rewrite the find command
452 // to wrap it in the explain command
459 // Did we provide a readConcern
460 if(cmd.readConcern) findCmd.readConcern = cmd.readConcern;
462 // Set up the serialize and ignoreUndefined fields
463 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
464 ? options.serializeFunctions : false;
465 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
466 ? options.ignoreUndefined : false;
468 // We have a Mongos topology, check if we need to add a readPreference
469 if(topology.type == 'mongos'
471 && readPreference.preference != 'primary') {
474 '$readPreference': readPreference.toJSON()
478 // Build Query object
479 var query = new Query(bson, commandns, findCmd, {
480 numberToSkip: 0, numberToReturn: 1
481 , checkKeys: false, returnFieldSelector: null
482 , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
486 query.slaveOk = readPreference.slaveOk();
493 // Set up a command cursor
494 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
495 // Set empty options object
496 options = options || {}
497 // Get the readPreference
498 var readPreference = getReadPreference(cmd, options);
502 for(var name in cmd) {
503 finalCmd[name] = cmd[name];
506 // Build command namespace
507 var parts = ns.split(/\./);
509 // Serialize functions
510 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
511 ? options.serializeFunctions : false;
513 // Set up the serialize and ignoreUndefined fields
514 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
515 ? options.ignoreUndefined : false;
517 // We have a Mongos topology, check if we need to add a readPreference
518 if(topology.type == 'mongos'
520 && readPreference.preference != 'primary') {
523 '$readPreference': readPreference.toJSON()
527 // Build Query object
528 var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
529 numberToSkip: 0, numberToReturn: -1
530 , checkKeys: false, serializeFunctions: serializeFunctions
531 , ignoreUndefined: ignoreUndefined
535 query.slaveOk = readPreference.slaveOk();
541 module.exports = WireProtocol;