8e83eb42ee54ac70152991a4667a157e82565668
[aai/esr-gui.git] /
1 "use strict";
2
3 var Insert = require('./commands').Insert
4   , Update = require('./commands').Update
5   , Remove = require('./commands').Remove
6   , Query = require('../connection/commands').Query
7   , copy = require('../connection/utils').copy
8   , KillCursor = require('../connection/commands').KillCursor
9   , GetMore = require('../connection/commands').GetMore
10   , Query = require('../connection/commands').Query
11   , ReadPreference = require('../topologies/read_preference')
12   , f = require('util').format
13   , CommandResult = require('../connection/command_result')
14   , MongoError = require('../error')
15   , Long = require('bson').Long
16   , getReadPreference = require('./shared').getReadPreference;
17
18 var WireProtocol = function() {}
19
20 //
21 // Execute a write operation
22 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
23   if(ops.length == 0) throw new MongoError("insert must contain at least one document");
24   if(typeof options == 'function') {
25     callback = options;
26     options = {};
27     options = options || {};
28   }
29
30   // Split the ns up to get db and collection
31   var p = ns.split(".");
32   var d = p.shift();
33   // Options
34   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
35   var writeConcern = options.writeConcern;
36
37   // return skeleton
38   var writeCommand = {};
39   writeCommand[type] = p.join('.');
40   writeCommand[opsField] = ops;
41   writeCommand.ordered = ordered;
42
43   // Did we specify a write concern
44   if(writeConcern && Object.keys(writeConcern).length > 0) {
45     writeCommand.writeConcern = writeConcern;
46   }
47
48   // Do we have bypassDocumentValidation set, then enable it on the write command
49   if(typeof options.bypassDocumentValidation == 'boolean') {
50     writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
51   }
52
53   // Options object
54   var opts = { command: true };
55   var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
56   if(type == 'insert') queryOptions.checkKeys = true;
57   // Ensure we support serialization of functions
58   if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
59   // Do not serialize the undefined fields
60   if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
61
62   try {
63     // Create write command
64     var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
65     // Execute command
66     pool.write(cmd, opts, callback);
67   } catch(err) {
68     callback(err);
69   }
70 }
71
72 //
73 // Needs to support legacy mass insert as well as ordered/unordered legacy
74 // emulation
75 //
76 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
77   executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
78 }
79
80 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
81   executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
82 }
83
84 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
85   executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
86 }
87
88 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
89   // Create a kill cursor command
90   var killCursor = new KillCursor(bson, [cursorId]);
91   // Execute the kill cursor command
92   if(pool && pool.isConnected()) {
93     pool.write(killCursor, {
94       immediateRelease:true, noResponse: true
95     });
96   }
97
98   // Callback
99   if(typeof callback == 'function') callback(null, null);
100 }
101
102 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
103   // Create getMore command
104   var getMore = new GetMore(bson, ns, cursorState.cursorId, {numberToReturn: batchSize});
105
106   // Query callback
107   var queryCallback = function(err, result) {
108     if(err) return callback(err);
109     // Get the raw message
110     var r = result.message;
111
112     // If we have a timed out query or a cursor that was killed
113     if((r.responseFlags & (1 << 0)) != 0) {
114       return callback(new MongoError("cursor does not exist, was killed or timed out"), null);
115     }
116
117     // Ensure we have a Long valie cursor id
118     var cursorId = typeof r.cursorId == 'number'
119       ? Long.fromNumber(r.cursorId)
120       : r.cursorId;
121
122     // Set all the values
123     cursorState.documents = r.documents;
124     cursorState.cursorId = cursorId;
125
126     // Return
127     callback(null, null, r.connection);
128   }
129
130   // If we have a raw query decorate the function
131   if(raw) {
132     queryCallback.raw = raw;
133   }
134
135   // Check if we need to promote longs
136   if(typeof cursorState.promoteLongs == 'boolean') {
137     queryCallback.promoteLongs = cursorState.promoteLongs;
138   }
139
140   if(typeof cursorState.promoteValues == 'boolean') {
141     queryCallback.promoteValues = cursorState.promoteValues;
142   }
143
144   if(typeof cursorState.promoteBuffers == 'boolean') {
145     queryCallback.promoteBuffers = cursorState.promoteBuffers;
146   }
147
148   // Write out the getMore command
149   connection.write(getMore, queryCallback);
150 }
151
152 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
153   // Establish type of command
154   if(cmd.find) {
155     return setupClassicFind(bson, ns, cmd, cursorState, topology, options)
156   } else if(cursorState.cursorId != null) {
157   } else if(cmd) {
158     return setupCommand(bson, ns, cmd, cursorState, topology, options);
159   } else {
160     throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
161   }
162 }
163
164 //
165 // Execute a find command
166 var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) {
167   // Ensure we have at least some options
168   options = options || {};
169   // Get the readPreference
170   var readPreference = getReadPreference(cmd, options);
171   // Set the optional batchSize
172   cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
173   var numberToReturn = 0;
174
175   // Unpack the limit and batchSize values
176   if(cursorState.limit == 0) {
177     numberToReturn = cursorState.batchSize;
178   } else if(cursorState.limit < 0 || cursorState.limit < cursorState.batchSize || (cursorState.limit > 0 && cursorState.batchSize == 0)) {
179     numberToReturn = cursorState.limit;
180   } else {
181     numberToReturn = cursorState.batchSize;
182   }
183
184   var numberToSkip = cursorState.skip || 0;
185   // Build actual find command
186   var findCmd = {};
187   // Using special modifier
188   var usesSpecialModifier = false;
189
190   // We have a Mongos topology, check if we need to add a readPreference
191   if(topology.type == 'mongos' && readPreference) {
192     findCmd['$readPreference'] = readPreference.toJSON();
193     usesSpecialModifier = true;
194   }
195
196   // Add special modifiers to the query
197   if(cmd.sort) findCmd['orderby'] = cmd.sort, usesSpecialModifier = true;
198   if(cmd.hint) findCmd['$hint'] = cmd.hint, usesSpecialModifier = true;
199   if(cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot, usesSpecialModifier = true;
200   if(cmd.returnKey) findCmd['$returnKey'] = cmd.returnKey, usesSpecialModifier = true;
201   if(cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan, usesSpecialModifier = true;
202   if(cmd.min) findCmd['$min'] = cmd.min, usesSpecialModifier = true;
203   if(cmd.max) findCmd['$max'] = cmd.max, usesSpecialModifier = true;
204   if(cmd.showDiskLoc) findCmd['$showDiskLoc'] = cmd.showDiskLoc, usesSpecialModifier = true;
205   if(cmd.comment) findCmd['$comment'] = cmd.comment, usesSpecialModifier = true;
206   if(cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS, usesSpecialModifier = true;
207
208   if(cmd.explain) {
209         // nToReturn must be 0 (match all) or negative (match N and close cursor)
210         // nToReturn > 0 will give explain results equivalent to limit(0)
211     numberToReturn = -Math.abs(cmd.limit || 0);
212     usesSpecialModifier = true;
213     findCmd['$explain'] = true;
214   }
215
216   // If we have a special modifier
217   if(usesSpecialModifier) {
218     findCmd['$query'] = cmd.query;
219   } else {
220     findCmd = cmd.query;
221   }
222
223   // Throw on majority readConcern passed in
224   if(cmd.readConcern && cmd.readConcern.level != 'local') {
225     throw new MongoError(f('server find command does not support a readConcern level of %s', cmd.readConcern.level));
226   }
227
228   // Remove readConcern, ensure no failing commands
229   if(cmd.readConcern) {
230     cmd = copy(cmd);
231     delete cmd['readConcern'];
232   }
233
234   // Serialize functions
235   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
236     ? options.serializeFunctions : false;
237   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
238     ? options.ignoreUndefined : false;
239
240   // Build Query object
241   var query = new Query(bson, ns, findCmd, {
242       numberToSkip: numberToSkip, numberToReturn: numberToReturn
243     , checkKeys: false, returnFieldSelector: cmd.fields
244     , serializeFunctions: serializeFunctions
245     , ignoreUndefined: ignoreUndefined
246   });
247
248   // Set query flags
249   query.slaveOk = readPreference.slaveOk();
250
251   // Set up the option bits for wire protocol
252   if(typeof cmd.tailable == 'boolean') {
253     query.tailable = cmd.tailable;
254   }
255
256   if(typeof cmd.oplogReplay == 'boolean') {
257     query.oplogReplay = cmd.oplogReplay;
258   }
259
260   if(typeof cmd.noCursorTimeout == 'boolean') {
261     query.noCursorTimeout = cmd.noCursorTimeout;
262   }
263
264   if(typeof cmd.awaitData == 'boolean') {
265     query.awaitData = cmd.awaitData;
266   }
267
268   if(typeof cmd.partial == 'boolean') {
269     query.partial = cmd.partial;
270   }
271
272   // Return the query
273   return query;
274 }
275
276 //
277 // Set up a command cursor
278 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
279   // Set empty options object
280   options = options || {}
281   // Get the readPreference
282   var readPreference = getReadPreference(cmd, options);
283
284   // Final query
285   var finalCmd = {};
286   for(var name in cmd) {
287     finalCmd[name] = cmd[name];
288   }
289
290   // Build command namespace
291   var parts = ns.split(/\./);
292
293   // Serialize functions
294   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
295     ? options.serializeFunctions : false;
296
297   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
298     ? options.ignoreUndefined : false;
299
300   // Throw on majority readConcern passed in
301   if(cmd.readConcern && cmd.readConcern.level != 'local') {
302     throw new MongoError(f('server %s command does not support a readConcern level of %s', JSON.stringify(cmd), cmd.readConcern.level));
303   }
304
305   // Remove readConcern, ensure no failing commands
306   if(cmd.readConcern) delete cmd['readConcern'];
307
308   // We have a Mongos topology, check if we need to add a readPreference
309   if(topology.type == 'mongos'
310     && readPreference
311     && readPreference.preference != 'primary') {
312     finalCmd = {
313       '$query': finalCmd,
314       '$readPreference': readPreference.toJSON()
315     };
316   }
317
318   // Build Query object
319   var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
320       numberToSkip: 0, numberToReturn: -1
321     , checkKeys: false, serializeFunctions: serializeFunctions
322     , ignoreUndefined: ignoreUndefined
323   });
324
325   // Set query flags
326   query.slaveOk = readPreference.slaveOk();
327
328   // Return the query
329   return query;
330 }
331
332 module.exports = WireProtocol;