3 var Query = require('../connection/commands').Query
4 , retrieveBSON = require('../connection/utils').retrieveBSON
5 , f = require('util').format
6 , MongoError = require('../error')
7 , getReadPreference = require('./shared').getReadPreference;
9 var BSON = retrieveBSON(),
12 var WireProtocol = function(legacyWireProtocol) {
13 this.legacyWireProtocol = legacyWireProtocol;
17 // Execute a write operation
18 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
19 if(ops.length == 0) throw new MongoError("insert must contain at least one document");
20 if(typeof options == 'function') {
23 options = options || {};
26 // Split the ns up to get db and collection
27 var p = ns.split(".");
30 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
31 var writeConcern = options.writeConcern;
34 var writeCommand = {};
35 writeCommand[type] = p.join('.');
36 writeCommand[opsField] = ops;
37 writeCommand.ordered = ordered;
39 // Did we specify a write concern
40 if(writeConcern && Object.keys(writeConcern).length > 0) {
41 writeCommand.writeConcern = writeConcern;
44 // If we have collation passed in
45 if(options.collation) {
46 for(var i = 0; i < writeCommand[opsField].length; i++) {
47 if(!writeCommand[opsField][i].collation) {
48 writeCommand[opsField][i].collation = options.collation;
53 // Do we have bypassDocumentValidation set, then enable it on the write command
54 if(typeof options.bypassDocumentValidation == 'boolean') {
55 writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
59 var opts = { command: true };
60 var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
61 if(type == 'insert') queryOptions.checkKeys = true;
63 // Ensure we support serialization of functions
64 if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
65 // Do not serialize the undefined fields
66 if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
69 // Create write command
70 var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
72 pool.write(cmd, opts, callback);
79 // Needs to support legacy mass insert as well as ordered/unordered legacy
82 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
83 executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
86 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
87 executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
90 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
91 executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
94 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
95 // Build command namespace
96 var parts = ns.split(/\./);
98 var commandns = f('%s.$cmd', parts.shift());
99 // Create getMore command
100 var killcursorCmd = {
101 killCursors: parts.join('.'),
105 // Build Query object
106 var query = new Query(bson, commandns, killcursorCmd, {
107 numberToSkip: 0, numberToReturn: -1
108 , checkKeys: false, returnFieldSelector: null
112 query.slaveOk = true;
114 // Kill cursor callback
115 var killCursorCallback = function(err, result) {
117 if(typeof callback != 'function') return;
118 return callback(err);
122 var r = result.message;
123 // If we have a timed out query or a cursor that was killed
124 if((r.responseFlags & (1 << 0)) != 0) {
125 if(typeof callback != 'function') return;
126 return callback(new MongoError("cursor killed or timed out"), null);
129 if(!Array.isArray(r.documents) || r.documents.length == 0) {
130 if(typeof callback != 'function') return;
131 return callback(new MongoError(f('invalid killCursors result returned for cursor id %s', cursorId)));
135 if(typeof callback == 'function') {
136 callback(null, r.documents[0]);
140 // Execute the kill cursor command
141 if(pool && pool.isConnected()) {
144 }, killCursorCallback);
148 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
149 options = options || {};
150 // Build command namespace
151 var parts = ns.split(/\./);
153 var commandns = f('%s.$cmd', parts.shift());
155 // Create getMore command
157 getMore: cursorState.cursorId,
158 collection: parts.join('.'),
159 batchSize: Math.abs(batchSize)
162 if(cursorState.cmd.tailable
163 && typeof cursorState.cmd.maxAwaitTimeMS == 'number') {
164 getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS;
167 // Build Query object
168 var query = new Query(bson, commandns, getMoreCmd, {
169 numberToSkip: 0, numberToReturn: -1
170 , checkKeys: false, returnFieldSelector: null
174 query.slaveOk = true;
177 var queryCallback = function(err, result) {
178 if(err) return callback(err);
179 // Get the raw message
180 var r = result.message;
182 // If we have a timed out query or a cursor that was killed
183 if((r.responseFlags & (1 << 0)) != 0) {
184 return callback(new MongoError("cursor killed or timed out"), null);
187 // Raw, return all the extracted documents
189 cursorState.documents = r.documents;
190 cursorState.cursorId = r.cursorId;
191 return callback(null, r.documents);
194 // We have an error detected
195 if(r.documents[0].ok == 0) {
196 return callback(MongoError.create(r.documents[0]));
199 // Ensure we have a Long valid cursor id
200 var cursorId = typeof r.documents[0].cursor.id == 'number'
201 ? Long.fromNumber(r.documents[0].cursor.id)
202 : r.documents[0].cursor.id;
204 // Set all the values
205 cursorState.documents = r.documents[0].cursor.nextBatch;
206 cursorState.cursorId = cursorId;
209 callback(null, r.documents[0], r.connection);
213 var queryOptions = { command: true };
215 // If we have a raw query decorate the function
217 queryOptions.raw = raw;
220 // Add the result field needed
221 queryOptions.documentsReturnedIn = 'nextBatch';
223 // Check if we need to promote longs
224 if(typeof cursorState.promoteLongs == 'boolean') {
225 queryOptions.promoteLongs = cursorState.promoteLongs;
228 if(typeof cursorState.promoteValues == 'boolean') {
229 queryCallback.promoteValues = cursorState.promoteValues;
232 if(typeof cursorState.promoteBuffers == 'boolean') {
233 queryCallback.promoteBuffers = cursorState.promoteBuffers;
236 // Write out the getMore command
237 connection.write(query, queryOptions, queryCallback);
240 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
241 // Establish type of command
243 // Create the find command
244 var query = executeFindCommand(bson, ns, cmd, cursorState, topology, options)
245 // Mark the cmd as virtual
247 // Signal the documents are in the firstBatch value
248 query.documentsReturnedIn = 'firstBatch';
251 } else if(cursorState.cursorId != null) {
254 return setupCommand(bson, ns, cmd, cursorState, topology, options);
256 throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
265 // , fields: <object>
268 // , explain: <boolean>
269 // , snapshot: <boolean>
271 // , returnKey: <boolean>
275 // , showDiskLoc: <boolean>
276 // , comment: <string>
279 // , readPreference: <ReadPreference>
280 // , tailable: <boolean>
281 // , oplogReplay: <boolean>
282 // , noCursorTimeout: <boolean>
283 // , awaitdata: <boolean>
284 // , exhaust: <boolean>
285 // , partial: <boolean>
291 // “filter”: { ... },
293 // “projection”: { ... },
297 // “batchSize”: <int>,
298 // “singleBatch”: <bool>,
299 // “comment”: <string>,
301 // “maxTimeMS”: <int>,
304 // “returnKey”: <bool>,
305 // “showRecordId”: <bool>,
306 // “snapshot”: <bool>,
307 // “tailable”: <bool>,
308 // “oplogReplay”: <bool>,
309 // “noCursorTimeout”: <bool>,
310 // “awaitData”: <bool>,
311 // “partial”: <bool>,
312 // “$readPreference”: { ... }
316 // Execute a find command
317 var executeFindCommand = function(bson, ns, cmd, cursorState, topology, options) {
318 // Ensure we have at least some options
319 options = options || {};
320 // Get the readPreference
321 var readPreference = getReadPreference(cmd, options);
322 // Set the optional batchSize
323 cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
325 // Build command namespace
326 var parts = ns.split(/\./);
328 var commandns = f('%s.$cmd', parts.shift());
330 // Build actual find command
332 find: parts.join('.')
335 // I we provided a filter
337 // Check if the user is passing in the $query parameter
338 if(cmd.query['$query']) {
339 findCmd.filter = cmd.query['$query'];
341 findCmd.filter = cmd.query;
346 var sortValue = cmd.sort;
348 // Handle issue of sort being an Array
349 if(Array.isArray(sortValue)) {
352 if(sortValue.length > 0 && !Array.isArray(sortValue[0])) {
353 var sortDirection = sortValue[1];
354 // Translate the sort order text
355 if(sortDirection == 'asc') {
357 } else if(sortDirection == 'desc') {
361 // Set the sort order
362 sortObject[sortValue[0]] = sortDirection;
364 for(var i = 0; i < sortValue.length; i++) {
365 sortDirection = sortValue[i][1];
366 // Translate the sort order text
367 if(sortDirection == 'asc') {
369 } else if(sortDirection == 'desc') {
373 // Set the sort order
374 sortObject[sortValue[i][0]] = sortDirection;
378 sortValue = sortObject;
381 // Add sort to command
382 if(cmd.sort) findCmd.sort = sortValue;
383 // Add a projection to the command
384 if(cmd.fields) findCmd.projection = cmd.fields;
385 // Add a hint to the command
386 if(cmd.hint) findCmd.hint = cmd.hint;
388 if(cmd.skip) findCmd.skip = cmd.skip;
390 if(cmd.limit) findCmd.limit = cmd.limit;
392 if(typeof cmd.batchSize == 'number') findCmd.batchSize = Math.abs(cmd.batchSize);
394 // Check if we wish to have a singleBatch
396 findCmd.limit = Math.abs(cmd.limit);
397 findCmd.singleBatch = true;
400 // If we have comment set
401 if(cmd.comment) findCmd.comment = cmd.comment;
403 // If we have maxScan
404 if(cmd.maxScan) findCmd.maxScan = cmd.maxScan;
406 // If we have maxTimeMS set
407 if(cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS;
410 if(cmd.min) findCmd.min = cmd.min;
413 if(cmd.max) findCmd.max = cmd.max;
415 // If we have returnKey set
416 if(cmd.returnKey) findCmd.returnKey = cmd.returnKey;
418 // If we have showDiskLoc set
419 if(cmd.showDiskLoc) findCmd.showRecordId = cmd.showDiskLoc;
421 // If we have snapshot set
422 if(cmd.snapshot) findCmd.snapshot = cmd.snapshot;
424 // If we have tailable set
425 if(cmd.tailable) findCmd.tailable = cmd.tailable;
427 // If we have oplogReplay set
428 if(cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay;
430 // If we have noCursorTimeout set
431 if(cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout;
433 // If we have awaitData set
434 if(cmd.awaitData) findCmd.awaitData = cmd.awaitData;
435 if(cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;
437 // If we have partial set
438 if(cmd.partial) findCmd.partial = cmd.partial;
440 // If we have collation passed in
441 if(cmd.collation) findCmd.collation = cmd.collation;
443 // If we have explain, we need to rewrite the find command
444 // to wrap it in the explain command
451 // Did we provide a readConcern
452 if(cmd.readConcern) findCmd.readConcern = cmd.readConcern;
454 // Set up the serialize and ignoreUndefined fields
455 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
456 ? options.serializeFunctions : false;
457 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
458 ? options.ignoreUndefined : false;
460 // We have a Mongos topology, check if we need to add a readPreference
461 if(topology.type == 'mongos'
463 && readPreference.preference != 'primary') {
466 '$readPreference': readPreference.toJSON()
470 // Build Query object
471 var query = new Query(bson, commandns, findCmd, {
472 numberToSkip: 0, numberToReturn: 1
473 , checkKeys: false, returnFieldSelector: null
474 , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
478 query.slaveOk = readPreference.slaveOk();
485 // Set up a command cursor
486 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
487 // Set empty options object
488 options = options || {}
489 // Get the readPreference
490 var readPreference = getReadPreference(cmd, options);
494 for(var name in cmd) {
495 finalCmd[name] = cmd[name];
498 // Build command namespace
499 var parts = ns.split(/\./);
501 // Serialize functions
502 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
503 ? options.serializeFunctions : false;
505 // Set up the serialize and ignoreUndefined fields
506 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
507 ? options.ignoreUndefined : false;
509 // We have a Mongos topology, check if we need to add a readPreference
510 if(topology.type == 'mongos'
512 && readPreference.preference != 'primary') {
515 '$readPreference': readPreference.toJSON()
519 // Build Query object
520 var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
521 numberToSkip: 0, numberToReturn: -1
522 , checkKeys: false, serializeFunctions: serializeFunctions
523 , ignoreUndefined: ignoreUndefined
527 query.slaveOk = readPreference.slaveOk();
533 module.exports = WireProtocol;