9f1410eebc44da5b4a63ac09860db62bfc4f5afa
[aai/esr-gui.git] /
1 "use strict";
2
3 var Query = require('../connection/commands').Query
4   , retrieveBSON = require('../connection/utils').retrieveBSON
5   , f = require('util').format
6   , MongoError = require('../error')
7   , getReadPreference = require('./shared').getReadPreference;
8
9 var BSON = retrieveBSON(),
10   Long = BSON.Long;
11
12 var WireProtocol = function(legacyWireProtocol) {
13   this.legacyWireProtocol = legacyWireProtocol;
14 }
15
16 //
17 // Execute a write operation
18 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
19   if(ops.length == 0) throw new MongoError("insert must contain at least one document");
20   if(typeof options == 'function') {
21     callback = options;
22     options = {};
23     options = options || {};
24   }
25
26   // Split the ns up to get db and collection
27   var p = ns.split(".");
28   var d = p.shift();
29   // Options
30   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
31   var writeConcern = options.writeConcern;
32
33   // return skeleton
34   var writeCommand = {};
35   writeCommand[type] = p.join('.');
36   writeCommand[opsField] = ops;
37   writeCommand.ordered = ordered;
38
39   // Did we specify a write concern
40   if(writeConcern && Object.keys(writeConcern).length > 0) {
41     writeCommand.writeConcern = writeConcern;
42   }
43
44   // If we have collation passed in
45   if(options.collation) {
46     for(var i = 0; i < writeCommand[opsField].length; i++) {
47       if(!writeCommand[opsField][i].collation) {
48         writeCommand[opsField][i].collation = options.collation;
49       }
50     }
51   }
52
53   // Do we have bypassDocumentValidation set, then enable it on the write command
54   if(typeof options.bypassDocumentValidation == 'boolean') {
55     writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
56   }
57
58   // Options object
59   var opts = { command: true };
60   var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
61   if(type == 'insert') queryOptions.checkKeys = true;
62
63   // Ensure we support serialization of functions
64   if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
65   // Do not serialize the undefined fields
66   if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
67
68   try {
69     // Create write command
70     var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
71     // Execute command
72     pool.write(cmd, opts, callback);
73   } catch(err) {
74     callback(err);
75   }
76 }
77
78 //
79 // Needs to support legacy mass insert as well as ordered/unordered legacy
80 // emulation
81 //
82 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
83   executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
84 }
85
86 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
87   executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
88 }
89
90 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
91   executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
92 }
93
94 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
95   // Build command namespace
96   var parts = ns.split(/\./);
97   // Command namespace
98   var commandns = f('%s.$cmd', parts.shift());
99   // Create getMore command
100   var killcursorCmd = {
101     killCursors: parts.join('.'),
102     cursors: [cursorId]
103   }
104
105   // Build Query object
106   var query = new Query(bson, commandns, killcursorCmd, {
107       numberToSkip: 0, numberToReturn: -1
108     , checkKeys: false, returnFieldSelector: null
109   });
110
111   // Set query flags
112   query.slaveOk = true;
113
114   // Kill cursor callback
115   var killCursorCallback = function(err, result) {
116     if(err) {
117       if(typeof callback != 'function') return;
118       return callback(err);
119     }
120
121     // Result
122     var r = result.message;
123     // If we have a timed out query or a cursor that was killed
124     if((r.responseFlags & (1 << 0)) != 0) {
125       if(typeof callback != 'function') return;
126       return callback(new MongoError("cursor killed or timed out"), null);
127     }
128
129     if(!Array.isArray(r.documents) || r.documents.length == 0) {
130       if(typeof callback != 'function') return;
131       return callback(new MongoError(f('invalid killCursors result returned for cursor id %s', cursorId)));
132     }
133
134     // Return the result
135     if(typeof callback == 'function') {
136       callback(null, r.documents[0]);
137     }
138   }
139
140   // Execute the kill cursor command
141   if(pool && pool.isConnected()) {
142     pool.write(query, {
143       command: true
144     }, killCursorCallback);
145   }
146 }
147
148 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
149   options = options || {};
150   // Build command namespace
151   var parts = ns.split(/\./);
152   // Command namespace
153   var commandns = f('%s.$cmd', parts.shift());
154
155   // Create getMore command
156   var getMoreCmd = {
157     getMore: cursorState.cursorId,
158     collection: parts.join('.'),
159     batchSize: Math.abs(batchSize)
160   }
161
162   if(cursorState.cmd.tailable
163     && typeof cursorState.cmd.maxAwaitTimeMS == 'number') {
164     getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS;
165   }
166
167   // Build Query object
168   var query = new Query(bson, commandns, getMoreCmd, {
169       numberToSkip: 0, numberToReturn: -1
170     , checkKeys: false, returnFieldSelector: null
171   });
172
173   // Set query flags
174   query.slaveOk = true;
175
176   // Query callback
177   var queryCallback = function(err, result) {
178     if(err) return callback(err);
179     // Get the raw message
180     var r = result.message;
181
182     // If we have a timed out query or a cursor that was killed
183     if((r.responseFlags & (1 << 0)) != 0) {
184       return callback(new MongoError("cursor killed or timed out"), null);
185     }
186
187     // Raw, return all the extracted documents
188     if(raw) {
189       cursorState.documents = r.documents;
190       cursorState.cursorId = r.cursorId;
191       return callback(null, r.documents);
192     }
193
194     // We have an error detected
195     if(r.documents[0].ok == 0) {
196       return callback(MongoError.create(r.documents[0]));
197     }
198
199     // Ensure we have a Long valid cursor id
200     var cursorId = typeof r.documents[0].cursor.id == 'number'
201       ? Long.fromNumber(r.documents[0].cursor.id)
202       : r.documents[0].cursor.id;
203
204     // Set all the values
205     cursorState.documents = r.documents[0].cursor.nextBatch;
206     cursorState.cursorId = cursorId;
207
208     // Return the result
209     callback(null, r.documents[0], r.connection);
210   }
211
212   // Query options
213   var queryOptions = { command: true };
214
215   // If we have a raw query decorate the function
216   if(raw) {
217     queryOptions.raw = raw;
218   }
219
220   // Add the result field needed
221   queryOptions.documentsReturnedIn = 'nextBatch';
222
223   // Check if we need to promote longs
224   if(typeof cursorState.promoteLongs == 'boolean') {
225     queryOptions.promoteLongs = cursorState.promoteLongs;
226   }
227
228   if(typeof cursorState.promoteValues == 'boolean') {
229     queryCallback.promoteValues = cursorState.promoteValues;
230   }
231
232   if(typeof cursorState.promoteBuffers == 'boolean') {
233     queryCallback.promoteBuffers = cursorState.promoteBuffers;
234   }
235
236   // Write out the getMore command
237   connection.write(query, queryOptions, queryCallback);
238 }
239
240 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
241   // Establish type of command
242   if(cmd.find) {
243     // Create the find command
244     var query = executeFindCommand(bson, ns, cmd, cursorState, topology, options)
245     // Mark the cmd as virtual
246     cmd.virtual = false;
247     // Signal the documents are in the firstBatch value
248     query.documentsReturnedIn = 'firstBatch';
249     // Return the query
250     return query;
251   } else if(cursorState.cursorId != null) {
252     return;
253   } else if(cmd) {
254     return setupCommand(bson, ns, cmd, cursorState, topology, options);
255   } else {
256     throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
257   }
258 }
259
260 // // Command
261 // {
262 //     find: ns
263 //   , query: <object>
264 //   , limit: <n>
265 //   , fields: <object>
266 //   , skip: <n>
267 //   , hint: <string>
268 //   , explain: <boolean>
269 //   , snapshot: <boolean>
270 //   , batchSize: <n>
271 //   , returnKey: <boolean>
272 //   , maxScan: <n>
273 //   , min: <n>
274 //   , max: <n>
275 //   , showDiskLoc: <boolean>
276 //   , comment: <string>
277 //   , maxTimeMS: <n>
278 //   , raw: <boolean>
279 //   , readPreference: <ReadPreference>
280 //   , tailable: <boolean>
281 //   , oplogReplay: <boolean>
282 //   , noCursorTimeout: <boolean>
283 //   , awaitdata: <boolean>
284 //   , exhaust: <boolean>
285 //   , partial: <boolean>
286 // }
287
288 // FIND/GETMORE SPEC
289 // {
290 //     “find”: <string>,
291 //     “filter”: { ... },
292 //     “sort”: { ... },
293 //     “projection”: { ... },
294 //     “hint”: { ... },
295 //     “skip”: <int>,
296 //     “limit”: <int>,
297 //     “batchSize”: <int>,
298 //     “singleBatch”: <bool>,
299 //     “comment”: <string>,
300 //     “maxScan”: <int>,
301 //     “maxTimeMS”: <int>,
302 //     “max”: { ... },
303 //     “min”: { ... },
304 //     “returnKey”: <bool>,
305 //     “showRecordId”: <bool>,
306 //     “snapshot”: <bool>,
307 //     “tailable”: <bool>,
308 //     “oplogReplay”: <bool>,
309 //     “noCursorTimeout”: <bool>,
310 //     “awaitData”: <bool>,
311 //     “partial”: <bool>,
312 //     “$readPreference”: { ... }
313 // }
314
315 //
316 // Execute a find command
317 var executeFindCommand = function(bson, ns, cmd, cursorState, topology, options) {
318   // Ensure we have at least some options
319   options = options || {};
320   // Get the readPreference
321   var readPreference = getReadPreference(cmd, options);
322   // Set the optional batchSize
323   cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
324
325   // Build command namespace
326   var parts = ns.split(/\./);
327   // Command namespace
328   var commandns = f('%s.$cmd', parts.shift());
329
330   // Build actual find command
331   var findCmd = {
332     find: parts.join('.')
333   };
334
335   // I we provided a filter
336   if(cmd.query) {
337     // Check if the user is passing in the $query parameter
338     if(cmd.query['$query']) {
339       findCmd.filter = cmd.query['$query'];
340     } else {
341       findCmd.filter = cmd.query;
342     }
343   }
344
345   // Sort value
346   var sortValue = cmd.sort;
347
348   // Handle issue of sort being an Array
349   if(Array.isArray(sortValue)) {
350     var sortObject = {};
351
352     if(sortValue.length > 0 && !Array.isArray(sortValue[0])) {
353       var sortDirection = sortValue[1];
354       // Translate the sort order text
355       if(sortDirection == 'asc') {
356         sortDirection = 1;
357       } else if(sortDirection == 'desc') {
358         sortDirection = -1;
359       }
360
361       // Set the sort order
362       sortObject[sortValue[0]] = sortDirection;
363     } else {
364       for(var i = 0; i < sortValue.length; i++) {
365         sortDirection = sortValue[i][1];
366         // Translate the sort order text
367         if(sortDirection == 'asc') {
368           sortDirection = 1;
369         } else if(sortDirection == 'desc') {
370           sortDirection = -1;
371         }
372
373         // Set the sort order
374         sortObject[sortValue[i][0]] = sortDirection;
375       }
376     }
377
378     sortValue = sortObject;
379   }
380
381   // Add sort to command
382   if(cmd.sort) findCmd.sort = sortValue;
383   // Add a projection to the command
384   if(cmd.fields) findCmd.projection = cmd.fields;
385   // Add a hint to the command
386   if(cmd.hint) findCmd.hint = cmd.hint;
387   // Add a skip
388   if(cmd.skip) findCmd.skip = cmd.skip;
389   // Add a limit
390   if(cmd.limit) findCmd.limit = cmd.limit;
391   // Add a batchSize
392   if(typeof cmd.batchSize == 'number') findCmd.batchSize = Math.abs(cmd.batchSize);
393
394   // Check if we wish to have a singleBatch
395   if(cmd.limit < 0) {
396     findCmd.limit = Math.abs(cmd.limit);
397     findCmd.singleBatch = true;
398   }
399
400   // If we have comment set
401   if(cmd.comment) findCmd.comment = cmd.comment;
402
403   // If we have maxScan
404   if(cmd.maxScan) findCmd.maxScan = cmd.maxScan;
405
406   // If we have maxTimeMS set
407   if(cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS;
408
409   // If we have min
410   if(cmd.min) findCmd.min = cmd.min;
411
412   // If we have max
413   if(cmd.max) findCmd.max = cmd.max;
414
415   // If we have returnKey set
416   if(cmd.returnKey) findCmd.returnKey = cmd.returnKey;
417
418   // If we have showDiskLoc set
419   if(cmd.showDiskLoc) findCmd.showRecordId = cmd.showDiskLoc;
420
421   // If we have snapshot set
422   if(cmd.snapshot) findCmd.snapshot = cmd.snapshot;
423
424   // If we have tailable set
425   if(cmd.tailable) findCmd.tailable = cmd.tailable;
426
427   // If we have oplogReplay set
428   if(cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay;
429
430   // If we have noCursorTimeout set
431   if(cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout;
432
433   // If we have awaitData set
434   if(cmd.awaitData) findCmd.awaitData = cmd.awaitData;
435   if(cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;
436
437   // If we have partial set
438   if(cmd.partial) findCmd.partial = cmd.partial;
439
440   // If we have collation passed in
441   if(cmd.collation) findCmd.collation = cmd.collation;
442
443   // If we have explain, we need to rewrite the find command
444   // to wrap it in the explain command
445   if(cmd.explain) {
446     findCmd = {
447       explain: findCmd
448     }
449   }
450
451   // Did we provide a readConcern
452   if(cmd.readConcern) findCmd.readConcern = cmd.readConcern;
453
454   // Set up the serialize and ignoreUndefined fields
455   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
456     ? options.serializeFunctions : false;
457   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
458     ? options.ignoreUndefined : false;
459
460   // We have a Mongos topology, check if we need to add a readPreference
461   if(topology.type == 'mongos'
462     && readPreference
463     && readPreference.preference != 'primary') {
464     findCmd = {
465       '$query': findCmd,
466       '$readPreference': readPreference.toJSON()
467     };
468   }
469
470   // Build Query object
471   var query = new Query(bson, commandns, findCmd, {
472       numberToSkip: 0, numberToReturn: 1
473     , checkKeys: false, returnFieldSelector: null
474     , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
475   });
476
477   // Set query flags
478   query.slaveOk = readPreference.slaveOk();
479
480   // Return the query
481   return query;
482 }
483
484 //
485 // Set up a command cursor
486 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
487   // Set empty options object
488   options = options || {}
489   // Get the readPreference
490   var readPreference = getReadPreference(cmd, options);
491
492   // Final query
493   var finalCmd = {};
494   for(var name in cmd) {
495     finalCmd[name] = cmd[name];
496   }
497
498   // Build command namespace
499   var parts = ns.split(/\./);
500
501   // Serialize functions
502   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
503     ? options.serializeFunctions : false;
504
505   // Set up the serialize and ignoreUndefined fields
506   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
507     ? options.ignoreUndefined : false;
508
509   // We have a Mongos topology, check if we need to add a readPreference
510   if(topology.type == 'mongos'
511     && readPreference
512     && readPreference.preference != 'primary') {
513     finalCmd = {
514       '$query': finalCmd,
515       '$readPreference': readPreference.toJSON()
516     };
517   }
518
519   // Build Query object
520   var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
521       numberToSkip: 0, numberToReturn: -1
522     , checkKeys: false, serializeFunctions: serializeFunctions
523     , ignoreUndefined: ignoreUndefined
524   });
525
526   // Set query flags
527   query.slaveOk = readPreference.slaveOk();
528
529   // Return the query
530   return query;
531 }
532
533 module.exports = WireProtocol;