3 var Insert = require('./commands').Insert
4 , Update = require('./commands').Update
5 , Remove = require('./commands').Remove
6 , Query = require('../connection/commands').Query
7 , copy = require('../connection/utils').copy
8 , KillCursor = require('../connection/commands').KillCursor
9 , GetMore = require('../connection/commands').GetMore
10 , Query = require('../connection/commands').Query
11 , ReadPreference = require('../topologies/read_preference')
12 , f = require('util').format
13 , CommandResult = require('../connection/command_result')
14 , MongoError = require('../error')
15 , Long = require('bson').Long
16 , getReadPreference = require('./shared').getReadPreference;
18 var WireProtocol = function() {}
21 // Execute a write operation
22 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
23 if(ops.length == 0) throw new MongoError("insert must contain at least one document");
24 if(typeof options == 'function') {
27 options = options || {};
30 // Split the ns up to get db and collection
31 var p = ns.split(".");
34 var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
35 var writeConcern = options.writeConcern;
38 var writeCommand = {};
39 writeCommand[type] = p.join('.');
40 writeCommand[opsField] = ops;
41 writeCommand.ordered = ordered;
43 // Did we specify a write concern
44 if(writeConcern && Object.keys(writeConcern).length > 0) {
45 writeCommand.writeConcern = writeConcern;
48 // Do we have bypassDocumentValidation set, then enable it on the write command
49 if(typeof options.bypassDocumentValidation == 'boolean') {
50 writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
54 var opts = { command: true };
55 var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
56 if(type == 'insert') queryOptions.checkKeys = true;
57 // Ensure we support serialization of functions
58 if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
59 // Do not serialize the undefined fields
60 if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
63 // Create write command
64 var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
66 pool.write(cmd, opts, callback);
73 // Needs to support legacy mass insert as well as ordered/unordered legacy
76 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
77 executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
80 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
81 executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
84 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
85 executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
88 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
89 // Create a kill cursor command
90 var killCursor = new KillCursor(bson, [cursorId]);
91 // Execute the kill cursor command
92 if(pool && pool.isConnected()) {
93 pool.write(killCursor, {
94 immediateRelease:true, noResponse: true
99 if(typeof callback == 'function') callback(null, null);
102 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
103 // Create getMore command
104 var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
107 var queryCallback = function(err, result) {
108 if(err) return callback(err);
109 // Get the raw message
110 var r = result.message;
112 // If we have a timed out query or a cursor that was killed
113 if((r.responseFlags & (1 << 0)) != 0) {
114 return callback(new MongoError("cursor does not exist, was killed or timed out"), null);
117 // Ensure we have a Long valie cursor id
118 var cursorId = typeof r.cursorId == 'number'
119 ? Long.fromNumber(r.cursorId)
122 // Set all the values
123 cursorState.documents = r.documents;
124 cursorState.cursorId = cursorId;
127 callback(null, null, r.connection);
130 // If we have a raw query decorate the function
132 queryCallback.raw = raw;
135 // Check if we need to promote longs
136 if(typeof cursorState.promoteLongs == 'boolean') {
137 queryCallback.promoteLongs = cursorState.promoteLongs;
140 if(typeof cursorState.promoteValues == 'boolean') {
141 queryCallback.promoteValues = cursorState.promoteValues;
144 if(typeof cursorState.promoteBuffers == 'boolean') {
145 queryCallback.promoteBuffers = cursorState.promoteBuffers;
148 // Write out the getMore command
149 connection.write(getMore, queryCallback);
152 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
153 // Establish type of command
155 return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
156 } else if(cursorState.cursorId != null) {
158 return setupCommand(bson, ns, cmd, cursorState, topology, options);
160 throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
165 // Execute a find command
166 var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
167 // Ensure we have at least some options
168 options = options || {};
169 // Get the readPreference
170 var readPreference = getReadPreference(cmd, options);
171 // Set the optional batchSize
172 cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
173 var numberToReturn = 0;
175 // Unpack the limit and batchSize values
176 if(cursorState.limit == 0) {
177 numberToReturn = cursorState.batchSize;
178 } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
179 numberToReturn = cursorState.limit;
181 numberToReturn = cursorState.batchSize;
184 var numberToSkip = cursorState.skip || 0;
185 // Build actual find command
187 // Using special modifier
188 var usesSpecialModifier = false;
190 // We have a Mongos topology, check if we need to add a readPreference
191 if(topology.type == 'mongos' && readPreference) {
192 findCmd['$readPreference'] = readPreference.toJSON();
193 usesSpecialModifier = true;
196 // Add special modifiers to the query
197 if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
198 if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
199 if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
200 if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
201 if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
202 if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
203 if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
204 if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
205 if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
206 if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
209 // nToReturn must be 0 (match all) or negative (match N and close cursor)
210 // nToReturn > 0 will give explain results equivalent to limit(0)
211 numberToReturn = -Math.abs(cmd.limit || 0);
212 usesSpecialModifier = true;
213 findCmd['$explain'] = true;
216 // If we have a special modifier
217 if(usesSpecialModifier) {
218 findCmd['$query'] = cmd.query;
223 // Throw on majority readConcern passed in
224 if(cmd.readConcern && cmd.readConcern.level != 'local') {
225 throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
228 // Remove readConcern, ensure no failing commands
229 if(cmd.readConcern) {
231 delete cmd['readConcern'];
234 // Serialize functions
235 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
236 ? options.serializeFunctions : false;
237 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
238 ? options.ignoreUndefined : false;
240 // Build Query object
241 var query = new Query(bson, ns, findCmd, {
242 numberToSkip: numberToSkip, numberToReturn: numberToReturn
243 , checkKeys: false, returnFieldSelector: cmd.fields
244 , serializeFunctions: serializeFunctions
245 , ignoreUndefined: ignoreUndefined
249 query.slaveOk = readPreference.slaveOk();
251 // Set up the option bits for wire protocol
252 if(typeof cmd.tailable == 'boolean') {
253 query.tailable = cmd.tailable;
256 if(typeof cmd.oplogReplay == 'boolean') {
257 query.oplogReplay = cmd.oplogReplay;
260 if(typeof cmd.noCursorTimeout == 'boolean') {
261 query.noCursorTimeout = cmd.noCursorTimeout;
264 if(typeof cmd.awaitData == 'boolean') {
265 query.awaitData = cmd.awaitData;
268 if(typeof cmd.partial == 'boolean') {
269 query.partial = cmd.partial;
277 // Set up a command cursor
278 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
279 // Set empty options object
280 options = options || {}
281 // Get the readPreference
282 var readPreference = getReadPreference(cmd, options);
286 for(var name in cmd) {
287 finalCmd[name] = cmd[name];
290 // Build command namespace
291 var parts = ns.split(/\./);
293 // Serialize functions
294 var serializeFunctions = typeof options.serializeFunctions == 'boolean'
295 ? options.serializeFunctions : false;
297 var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
298 ? options.ignoreUndefined : false;
300 // Throw on majority readConcern passed in
301 if(cmd.readConcern && cmd.readConcern.level != 'local') {
302 throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
305 // Remove readConcern, ensure no failing commands
306 if(cmd.readConcern) delete cmd['readConcern'];
308 // We have a Mongos topology, check if we need to add a readPreference
309 if(topology.type == 'mongos'
311 && readPreference.preference != 'primary') {
314 '$readPreference': readPreference.toJSON()
318 // Build Query object
319 var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
320 numberToSkip: 0, numberToReturn: -1
321 , checkKeys: false, serializeFunctions: serializeFunctions
322 , ignoreUndefined: ignoreUndefined
326 query.slaveOk = readPreference.slaveOk();
332 module.exports = WireProtocol;