3 var copy = require('../connection/utils').copy
4 , retrieveBSON = require('../connection/utils').retrieveBSON
5 , KillCursor = require('../connection/commands').KillCursor
6 , GetMore = require('../connection/commands').GetMore
7 , Query = require('../connection/commands').Query
8 , f = require('util').format
9 , MongoError = require('../error')
10 , getReadPreference = require('./shared').getReadPreference;
12 var BSON = retrieveBSON(),
15 var WireProtocol = function() {}
18 // Execute a write operation
19 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
20 if(ops.length == 0) throw new MongoError("insert must contain at least one document");
21 if(typeof options == 'function') {
24 options = options || {};
27 // Split the ns up to get db and collection
28 var p = ns.split(".");
31 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
32 var writeConcern = options.writeConcern;
35 var writeCommand = {};
36 writeCommand[type] = p.join('.');
37 writeCommand[opsField] = ops;
38 writeCommand.ordered = ordered;
40 // Did we specify a write concern
41 if(writeConcern && Object.keys(writeConcern).length > 0) {
42 writeCommand.writeConcern = writeConcern;
45 // Do we have bypassDocumentValidation set, then enable it on the write command
46 if(typeof options.bypassDocumentValidation == 'boolean') {
47 writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
51 var opts = { command: true };
52 var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
53 if(type == 'insert') queryOptions.checkKeys = true;
54 // Ensure we support serialization of functions
55 if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
56 // Do not serialize the undefined fields
57 if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
60 // Create write command
61 var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
63 pool.write(cmd, opts, callback);
70 // Needs to support legacy mass insert as well as ordered/unordered legacy
73 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
74 executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
77 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
78 executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
81 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
82 executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
85 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
86 // Create a kill cursor command
87 var killCursor = new KillCursor(bson, [cursorId]);
88 // Execute the kill cursor command
89 if(pool && pool.isConnected()) {
90 pool.write(killCursor, {
91 immediateRelease:true, noResponse: true
96 if(typeof callback == 'function') callback(null, null);
99 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
100 // Create getMore command
101 var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
104 var queryCallback = function(err, result) {
105 if(err) return callback(err);
106 // Get the raw message
107 var r = result.message;
109 // If we have a timed out query or a cursor that was killed
110 if((r.responseFlags & (1 << 0)) != 0) {
111 return callback(new MongoError("cursor does not exist, was killed or timed out"), null);
114 // Ensure we have a Long valie cursor id
115 var cursorId = typeof r.cursorId == 'number'
116 ? Long.fromNumber(r.cursorId)
119 // Set all the values
120 cursorState.documents = r.documents;
121 cursorState.cursorId = cursorId;
124 callback(null, null, r.connection);
127 // If we have a raw query decorate the function
129 queryCallback.raw = raw;
132 // Check if we need to promote longs
133 if(typeof cursorState.promoteLongs == 'boolean') {
134 queryCallback.promoteLongs = cursorState.promoteLongs;
137 if(typeof cursorState.promoteValues == 'boolean') {
138 queryCallback.promoteValues = cursorState.promoteValues;
141 if(typeof cursorState.promoteBuffers == 'boolean') {
142 queryCallback.promoteBuffers = cursorState.promoteBuffers;
145 // Write out the getMore command
146 connection.write(getMore, queryCallback);
149 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
150 // Establish type of command
152 return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
153 } else if(cursorState.cursorId != null) {
156 return setupCommand(bson, ns, cmd, cursorState, topology, options);
158 throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
163 // Execute a find command
164 var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
165 // Ensure we have at least some options
166 options = options || {};
167 // Get the readPreference
168 var readPreference = getReadPreference(cmd, options);
169 // Set the optional batchSize
170 cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
171 var numberToReturn = 0;
173 // Unpack the limit and batchSize values
174 if(cursorState.limit == 0) {
175 numberToReturn = cursorState.batchSize;
176 } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
177 numberToReturn = cursorState.limit;
179 numberToReturn = cursorState.batchSize;
182 var numberToSkip = cursorState.skip || 0;
183 // Build actual find command
185 // Using special modifier
186 var usesSpecialModifier = false;
188 // We have a Mongos topology, check if we need to add a readPreference
189 if(topology.type == 'mongos' && readPreference) {
190 findCmd['$readPreference'] = readPreference.toJSON();
191 usesSpecialModifier = true;
194 // Add special modifiers to the query
195 if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
196 if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
197 if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
198 if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
199 if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
200 if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
201 if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
202 if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
203 if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
204 if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
207 // nToReturn must be 0 (match all) or negative (match N and close cursor)
208 // nToReturn > 0 will give explain results equivalent to limit(0)
209 numberToReturn = -Math.abs(cmd.limit || 0);
210 usesSpecialModifier = true;
211 findCmd['$explain'] = true;
214 // If we have a special modifier
215 if(usesSpecialModifier) {
216 findCmd['$query'] = cmd.query;
221 // Throw on majority readConcern passed in
222 if(cmd.readConcern && cmd.readConcern.level != 'local') {
223 throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
226 // Remove readConcern, ensure no failing commands
227 if(cmd.readConcern) {
229 delete cmd['readConcern'];
232 // Serialize functions
233 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
234 ? options.serializeFunctions : false;
235 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
236 ? options.ignoreUndefined : false;
238 // Build Query object
239 var query = new Query(bson, ns, findCmd, {
240 numberToSkip: numberToSkip, numberToReturn: numberToReturn
241 , checkKeys: false, returnFieldSelector: cmd.fields
242 , serializeFunctions: serializeFunctions
243 , ignoreUndefined: ignoreUndefined
247 query.slaveOk = readPreference.slaveOk();
249 // Set up the option bits for wire protocol
250 if(typeof cmd.tailable == 'boolean') {
251 query.tailable = cmd.tailable;
254 if(typeof cmd.oplogReplay == 'boolean') {
255 query.oplogReplay = cmd.oplogReplay;
258 if(typeof cmd.noCursorTimeout == 'boolean') {
259 query.noCursorTimeout = cmd.noCursorTimeout;
262 if(typeof cmd.awaitData == 'boolean') {
263 query.awaitData = cmd.awaitData;
266 if(typeof cmd.partial == 'boolean') {
267 query.partial = cmd.partial;
275 // Set up a command cursor
276 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
277 // Set empty options object
278 options = options || {}
279 // Get the readPreference
280 var readPreference = getReadPreference(cmd, options);
284 for(var name in cmd) {
285 finalCmd[name] = cmd[name];
288 // Build command namespace
289 var parts = ns.split(/\./);
291 // Serialize functions
292 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
293 ? options.serializeFunctions : false;
295 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
296 ? options.ignoreUndefined : false;
298 // Throw on majority readConcern passed in
299 if(cmd.readConcern && cmd.readConcern.level != 'local') {
300 throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
303 // Remove readConcern, ensure no failing commands
304 if(cmd.readConcern) delete cmd['readConcern'];
306 // We have a Mongos topology, check if we need to add a readPreference
307 if(topology.type == 'mongos'
309 && readPreference.preference != 'primary') {
312 '$readPreference': readPreference.toJSON()
316 // Build Query object
317 var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
318 numberToSkip: 0, numberToReturn: -1
319 , checkKeys: false, serializeFunctions: serializeFunctions
320 , ignoreUndefined: ignoreUndefined
324 query.slaveOk = readPreference.slaveOk();
330 module.exports = WireProtocol;