33cc6d07c75a77822e7376661fc9d450fad74c1b
[aai/esr-gui.git] /
1 "use strict";
2
3 var copy = require('../connection/utils').copy
4   , retrieveBSON = require('../connection/utils').retrieveBSON
5   , KillCursor = require('../connection/commands').KillCursor
6   , GetMore = require('../connection/commands').GetMore
7   , Query = require('../connection/commands').Query
8   , f = require('util').format
9   , MongoError = require('../error')
10   , getReadPreference = require('./shared').getReadPreference;
11
12 var BSON = retrieveBSON(),
13   Long = BSON.Long;
14
15 var WireProtocol = function() {}
16
17 //
18 // Execute a write operation
19 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
20   if(ops.length == 0) throw new MongoError("insert must contain at least one document");
21   if(typeof options == 'function') {
22     callback = options;
23     options = {};
24     options = options || {};
25   }
26
27   // Split the ns up to get db and collection
28   var p = ns.split(".");
29   var d = p.shift();
30   // Options
31   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
32   var writeConcern = options.writeConcern;
33
34   // return skeleton
35   var writeCommand = {};
36   writeCommand[type] = p.join('.');
37   writeCommand[opsField] = ops;
38   writeCommand.ordered = ordered;
39
40   // Did we specify a write concern
41   if(writeConcern && Object.keys(writeConcern).length > 0) {
42     writeCommand.writeConcern = writeConcern;
43   }
44
45   // Do we have bypassDocumentValidation set, then enable it on the write command
46   if(typeof options.bypassDocumentValidation == 'boolean') {
47     writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
48   }
49
50   // Options object
51   var opts = { command: true };
52   var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
53   if(type == 'insert') queryOptions.checkKeys = true;
54   // Ensure we support serialization of functions
55   if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
56   // Do not serialize the undefined fields
57   if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
58
59   try {
60     // Create write command
61     var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
62     // Execute command
63     pool.write(cmd, opts, callback);
64   } catch(err) {
65     callback(err);
66   }
67 }
68
69 //
70 // Needs to support legacy mass insert as well as ordered/unordered legacy
71 // emulation
72 //
73 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
74   executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
75 }
76
77 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
78   executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
79 }
80
81 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
82   executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
83 }
84
85 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
86   // Create a kill cursor command
87   var killCursor = new KillCursor(bson, [cursorId]);
88   // Execute the kill cursor command
89   if(pool && pool.isConnected()) {
90     pool.write(killCursor, {
91       immediateRelease:true, noResponse: true
92     });
93   }
94
95   // Callback
96   if(typeof callback == 'function') callback(null, null);
97 }
98
99 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
100   // Create getMore command
101   var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
102
103   // Query callback
104   var queryCallback = function(err, result) {
105     if(err) return callback(err);
106     // Get the raw message
107     var r = result.message;
108
109     // If we have a timed out query or a cursor that was killed
110     if((r.responseFlags & (1 << 0)) != 0) {
111       return callback(new MongoError("cursor does not exist, was killed or timed out"), null);
112     }
113
114     // Ensure we have a Long valie cursor id
115     var cursorId = typeof r.cursorId == 'number'
116       ? Long.fromNumber(r.cursorId)
117       : r.cursorId;
118
119     // Set all the values
120     cursorState.documents = r.documents;
121     cursorState.cursorId = cursorId;
122
123     // Return
124     callback(null, null, r.connection);
125   }
126
127   // If we have a raw query decorate the function
128   if(raw) {
129     queryCallback.raw = raw;
130   }
131
132   // Check if we need to promote longs
133   if(typeof cursorState.promoteLongs == 'boolean') {
134     queryCallback.promoteLongs = cursorState.promoteLongs;
135   }
136
137   if(typeof cursorState.promoteValues == 'boolean') {
138     queryCallback.promoteValues = cursorState.promoteValues;
139   }
140
141   if(typeof cursorState.promoteBuffers == 'boolean') {
142     queryCallback.promoteBuffers = cursorState.promoteBuffers;
143   }
144
145   // Write out the getMore command
146   connection.write(getMore, queryCallback);
147 }
148
149 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
150   // Establish type of command
151   if(cmd.find) {
152     return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
153   } else if(cursorState.cursorId != null) {
154     return;
155   } else if(cmd) {
156     return setupCommand(bson, ns, cmd, cursorState, topology, options);
157   } else {
158     throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
159   }
160 }
161
162 //
163 // Execute a find command
164 var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
165   // Ensure we have at least some options
166   options = options || {};
167   // Get the readPreference
168   var readPreference = getReadPreference(cmd, options);
169   // Set the optional batchSize
170   cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
171   var numberToReturn = 0;
172
173   // Unpack the limit and batchSize values
174   if(cursorState.limit == 0) {
175     numberToReturn = cursorState.batchSize;
176   } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
177     numberToReturn = cursorState.limit;
178   } else {
179     numberToReturn = cursorState.batchSize;
180   }
181
182   var numberToSkip = cursorState.skip || 0;
183   // Build actual find command
184   var findCmd = {};
185   // Using special modifier
186   var usesSpecialModifier = false;
187
188   // We have a Mongos topology, check if we need to add a readPreference
189   if(topology.type == 'mongos' && readPreference) {
190     findCmd['$readPreference'] = readPreference.toJSON();
191     usesSpecialModifier = true;
192   }
193
194   // Add special modifiers to the query
195   if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
196   if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
197   if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
198   if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
199   if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
200   if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
201   if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
202   if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
203   if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
204   if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
205
206   if(cmd.explain) {
207         // nToReturn must be 0 (match all) or negative (match N and close cursor)
208         // nToReturn > 0 will give explain results equivalent to limit(0)
209     numberToReturn = -Math.abs(cmd.limit || 0);
210     usesSpecialModifier = true;
211     findCmd['$explain'] = true;
212   }
213
214   // If we have a special modifier
215   if(usesSpecialModifier) {
216     findCmd['$query'] = cmd.query;
217   } else {
218     findCmd = cmd.query;
219   }
220
221   // Throw on majority readConcern passed in
222   if(cmd.readConcern && cmd.readConcern.level != 'local') {
223     throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
224   }
225
226   // Remove readConcern, ensure no failing commands
227   if(cmd.readConcern) {
228     cmd = copy(cmd);
229     delete cmd['readConcern'];
230   }
231
232   // Serialize functions
233   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
234     ? options.serializeFunctions : false;
235   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
236     ? options.ignoreUndefined : false;
237
238   // Build Query object
239   var query = new Query(bson, ns, findCmd, {
240       numberToSkip: numberToSkip, numberToReturn: numberToReturn
241     , checkKeys: false, returnFieldSelector: cmd.fields
242     , serializeFunctions: serializeFunctions
243     , ignoreUndefined: ignoreUndefined
244   });
245
246   // Set query flags
247   query.slaveOk = readPreference.slaveOk();
248
249   // Set up the option bits for wire protocol
250   if(typeof cmd.tailable == 'boolean') {
251     query.tailable = cmd.tailable;
252   }
253
254   if(typeof cmd.oplogReplay == 'boolean') {
255     query.oplogReplay = cmd.oplogReplay;
256   }
257
258   if(typeof cmd.noCursorTimeout == 'boolean') {
259     query.noCursorTimeout = cmd.noCursorTimeout;
260   }
261
262   if(typeof cmd.awaitData == 'boolean') {
263     query.awaitData = cmd.awaitData;
264   }
265
266   if(typeof cmd.partial == 'boolean') {
267     query.partial = cmd.partial;
268   }
269
270   // Return the query
271   return query;
272 }
273
274 //
275 // Set up a command cursor
276 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
277   // Set empty options object
278   options = options || {}
279   // Get the readPreference
280   var readPreference = getReadPreference(cmd, options);
281
282   // Final query
283   var finalCmd = {};
284   for(var name in cmd) {
285     finalCmd[name] = cmd[name];
286   }
287
288   // Build command namespace
289   var parts = ns.split(/\./);
290
291   // Serialize functions
292   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
293     ? options.serializeFunctions : false;
294
295   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
296     ? options.ignoreUndefined : false;
297
298   // Throw on majority readConcern passed in
299   if(cmd.readConcern && cmd.readConcern.level != 'local') {
300     throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
301   }
302
303   // Remove readConcern, ensure no failing commands
304   if(cmd.readConcern) delete cmd['readConcern'];
305
306   // We have a Mongos topology, check if we need to add a readPreference
307   if(topology.type == 'mongos'
308     && readPreference
309     && readPreference.preference != 'primary') {
310     finalCmd = {
311       '$query': finalCmd,
312       '$readPreference': readPreference.toJSON()
313     };
314   }
315
316   // Build Query object
317   var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
318       numberToSkip: 0, numberToReturn: -1
319     , checkKeys: false, serializeFunctions: serializeFunctions
320     , ignoreUndefined: ignoreUndefined
321   });
322
323   // Set query flags
324   query.slaveOk = readPreference.slaveOk();
325
326   // Return the query
327   return query;
328 }
329
330 module.exports = WireProtocol;