b57048d85666ff6dff7fbf6f3045d5985d5184c4
[aai/esr-gui.git] /
1 "use strict";
2
3 var Insert = require('./commands').Insert
4   , Update = require('./commands').Update
5   , Remove = require('./commands').Remove
6   , Query = require('../connection/commands').Query
7   , copy = require('../connection/utils').copy
8   , KillCursor = require('../connection/commands').KillCursor
9   , GetMore = require('../connection/commands').GetMore
10   , Query = require('../connection/commands').Query
11   , ReadPreference = require('../topologies/read_preference')
12   , f = require('util').format
13   , CommandResult = require('../connection/command_result')
14   , MongoError = require('../error')
15   , Long = require('bson').Long
16   , getReadPreference = require('./shared').getReadPreference;
17
18 var WireProtocol = function(legacyWireProtocol) {
19   this.legacyWireProtocol = legacyWireProtocol;
20 }
21
22 //
23 // Execute a write operation
24 var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) {
25   if(ops.length == 0) throw new MongoError("insert must contain at least one document");
26   if(typeof options == 'function') {
27     callback = options;
28     options = {};
29     options = options || {};
30   }
31
32   // Split the ns up to get db and collection
33   var p = ns.split(".");
34   var d = p.shift();
35   // Options
36   var ordered = typeof options.ordered == 'boolean' ? options.ordered : true;
37   var writeConcern = options.writeConcern;
38
39   // return skeleton
40   var writeCommand = {};
41   writeCommand[type] = p.join('.');
42   writeCommand[opsField] = ops;
43   writeCommand.ordered = ordered;
44
45   // Did we specify a write concern
46   if(writeConcern && Object.keys(writeConcern).length > 0) {
47     writeCommand.writeConcern = writeConcern;
48   }
49
50   // If we have collation passed in
51   if(options.collation) {
52     for(var i = 0; i < writeCommand[opsField].length; i++) {
53       if(!writeCommand[opsField][i].collation) {
54         writeCommand[opsField][i].collation = options.collation;
55       }
56     }
57   }
58
59   // Do we have bypassDocumentValidation set, then enable it on the write command
60   if(typeof options.bypassDocumentValidation == 'boolean') {
61     writeCommand.bypassDocumentValidation = options.bypassDocumentValidation;
62   }
63
64   // Options object
65   var opts = { command: true };
66   var queryOptions = { checkKeys : false, numberToSkip: 0, numberToReturn: 1 };
67   if(type == 'insert') queryOptions.checkKeys = true;
68
69   // Ensure we support serialization of functions
70   if(options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions;
71   // Do not serialize the undefined fields
72   if(options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined;
73
74   try {
75     // Create write command
76     var cmd = new Query(bson, f("%s.$cmd", d), writeCommand, queryOptions);
77     // Execute command
78     pool.write(cmd, opts, callback);
79   } catch(err) {
80     callback(err);
81   }
82 }
83
84 //
85 // Needs to support legacy mass insert as well as ordered/unordered legacy
86 // emulation
87 //
88 WireProtocol.prototype.insert = function(pool, ismaster, ns, bson, ops, options, callback) {
89   executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback);
90 }
91
92 WireProtocol.prototype.update = function(pool, ismaster, ns, bson, ops, options, callback) {
93   executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback);
94 }
95
96 WireProtocol.prototype.remove = function(pool, ismaster, ns, bson, ops, options, callback) {
97   executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback);
98 }
99
100 WireProtocol.prototype.killCursor = function(bson, ns, cursorId, pool, callback) {
101   // Build command namespace
102   var parts = ns.split(/\./);
103   // Command namespace
104   var commandns = f('%s.$cmd', parts.shift());
105   // Create getMore command
106   var killcursorCmd = {
107     killCursors: parts.join('.'),
108     cursors: [cursorId]
109   }
110
111   // Build Query object
112   var query = new Query(bson, commandns, killcursorCmd, {
113       numberToSkip: 0, numberToReturn: -1
114     , checkKeys: false, returnFieldSelector: null
115   });
116
117   // Set query flags
118   query.slaveOk = true;
119
120   // Kill cursor callback
121   var killCursorCallback = function(err, result) {
122     if(err) {
123       if(typeof callback != 'function') return;
124       return callback(err);
125     }
126
127     // Result
128     var r = result.message;
129     // If we have a timed out query or a cursor that was killed
130     if((r.responseFlags & (1 << 0)) != 0) {
131       if(typeof callback != 'function') return;
132       return callback(new MongoError("cursor killed or timed out"), null);
133     }
134
135     if(!Array.isArray(r.documents) || r.documents.length == 0) {
136       if(typeof callback != 'function') return;
137       return callback(new MongoError(f('invalid killCursors result returned for cursor id %s', cursorState.cursorId)));
138     }
139
140     // Return the result
141     if(typeof callback == 'function') {
142       callback(null, r.documents[0]);
143     }
144   }
145
146   // Execute the kill cursor command
147   if(pool && pool.isConnected()) {
148     pool.write(query, {
149       command: true
150     }, killCursorCallback);
151   }
152 }
153
154 WireProtocol.prototype.getMore = function(bson, ns, cursorState, batchSize, raw, connection, options, callback) {
155   options = options || {};
156   // Build command namespace
157   var parts = ns.split(/\./);
158   // Command namespace
159   var commandns = f('%s.$cmd', parts.shift());
160
161   // Check if we have an maxTimeMS set
162   var maxTimeMS = typeof cursorState.cmd.maxTimeMS == 'number' ? cursorState.cmd.maxTimeMS : 3000;
163
164   // Create getMore command
165   var getMoreCmd = {
166     getMore: cursorState.cursorId,
167     collection: parts.join('.'),
168     batchSize: Math.abs(batchSize)
169   }
170
171   if(cursorState.cmd.tailable
172     && typeof cursorState.cmd.maxAwaitTimeMS == 'number') {
173     getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS;
174   }
175
176   // Build Query object
177   var query = new Query(bson, commandns, getMoreCmd, {
178       numberToSkip: 0, numberToReturn: -1
179     , checkKeys: false, returnFieldSelector: null
180   });
181
182   // Set query flags
183   query.slaveOk = true;
184
185   // Query callback
186   var queryCallback = function(err, result) {
187     if(err) return callback(err);
188     // Get the raw message
189     var r = result.message;
190
191     // If we have a timed out query or a cursor that was killed
192     if((r.responseFlags & (1 << 0)) != 0) {
193       return callback(new MongoError("cursor killed or timed out"), null);
194     }
195
196     // Raw, return all the extracted documents
197     if(raw) {
198       cursorState.documents = r.documents;
199       cursorState.cursorId = r.cursorId;
200       return callback(null, r.documents);
201     }
202
203     // We have an error detected
204     if(r.documents[0].ok == 0) {
205       return callback(MongoError.create(r.documents[0]));
206     }
207
208     // Ensure we have a Long valid cursor id
209     var cursorId = typeof r.documents[0].cursor.id == 'number'
210       ? Long.fromNumber(r.documents[0].cursor.id)
211       : r.documents[0].cursor.id;
212
213     // Set all the values
214     cursorState.documents = r.documents[0].cursor.nextBatch;
215     cursorState.cursorId = cursorId;
216
217     // Return the result
218     callback(null, r.documents[0], r.connection);
219   }
220
221   // Query options
222   var queryOptions = { command: true };
223
224   // If we have a raw query decorate the function
225   if(raw) {
226     queryOptions.raw = raw;
227   }
228
229   // Add the result field needed
230   queryOptions.documentsReturnedIn = 'nextBatch';
231
232   // Check if we need to promote longs
233   if(typeof cursorState.promoteLongs == 'boolean') {
234     queryOptions.promoteLongs = cursorState.promoteLongs;
235   }
236
237   if(typeof cursorState.promoteValues == 'boolean') {
238     queryCallback.promoteValues = cursorState.promoteValues;
239   }
240
241   if(typeof cursorState.promoteBuffers == 'boolean') {
242     queryCallback.promoteBuffers = cursorState.promoteBuffers;
243   }
244
245   // Write out the getMore command
246   connection.write(query, queryOptions, queryCallback);
247 }
248
249 WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) {
250   // Establish type of command
251   if(cmd.find) {
252     // Create the find command
253     var query = executeFindCommand(bson, ns, cmd, cursorState, topology, options)
254     // Mark the cmd as virtual
255     cmd.virtual = false;
256     // Signal the documents are in the firstBatch value
257     query.documentsReturnedIn = 'firstBatch';
258     // Return the query
259     return query;
260   } else if(cursorState.cursorId != null) {
261   } else if(cmd) {
262     return setupCommand(bson, ns, cmd, cursorState, topology, options);
263   } else {
264     throw new MongoError(f("command %s does not return a cursor", JSON.stringify(cmd)));
265   }
266 }
267
268 // // Command
269 // {
270 //     find: ns
271 //   , query: <object>
272 //   , limit: <n>
273 //   , fields: <object>
274 //   , skip: <n>
275 //   , hint: <string>
276 //   , explain: <boolean>
277 //   , snapshot: <boolean>
278 //   , batchSize: <n>
279 //   , returnKey: <boolean>
280 //   , maxScan: <n>
281 //   , min: <n>
282 //   , max: <n>
283 //   , showDiskLoc: <boolean>
284 //   , comment: <string>
285 //   , maxTimeMS: <n>
286 //   , raw: <boolean>
287 //   , readPreference: <ReadPreference>
288 //   , tailable: <boolean>
289 //   , oplogReplay: <boolean>
290 //   , noCursorTimeout: <boolean>
291 //   , awaitdata: <boolean>
292 //   , exhaust: <boolean>
293 //   , partial: <boolean>
294 // }
295
296 // FIND/GETMORE SPEC
297 // {
298 //     “find”: <string>,
299 //     “filter”: { ... },
300 //     “sort”: { ... },
301 //     “projection”: { ... },
302 //     “hint”: { ... },
303 //     “skip”: <int>,
304 //     “limit”: <int>,
305 //     “batchSize”: <int>,
306 //     “singleBatch”: <bool>,
307 //     “comment”: <string>,
308 //     “maxScan”: <int>,
309 //     “maxTimeMS”: <int>,
310 //     “max”: { ... },
311 //     “min”: { ... },
312 //     “returnKey”: <bool>,
313 //     “showRecordId”: <bool>,
314 //     “snapshot”: <bool>,
315 //     “tailable”: <bool>,
316 //     “oplogReplay”: <bool>,
317 //     “noCursorTimeout”: <bool>,
318 //     “awaitData”: <bool>,
319 //     “partial”: <bool>,
320 //     “$readPreference”: { ... }
321 // }
322
323 //
324 // Execute a find command
325 var executeFindCommand = function(bson, ns, cmd, cursorState, topology, options) {
326   // Ensure we have at least some options
327   options = options || {};
328   // Get the readPreference
329   var readPreference = getReadPreference(cmd, options);
330   // Set the optional batchSize
331   cursorState.batchSize = cmd.batchSize || cursorState.batchSize;
332
333   // Build command namespace
334   var parts = ns.split(/\./);
335   // Command namespace
336   var commandns = f('%s.$cmd', parts.shift());
337
338   // Build actual find command
339   var findCmd = {
340     find: parts.join('.')
341   };
342
343   // I we provided a filter
344   if(cmd.query) {
345     // Check if the user is passing in the $query parameter
346     if(cmd.query['$query']) {
347       findCmd.filter = cmd.query['$query'];
348     } else {
349       findCmd.filter = cmd.query;
350     }
351   }
352
353   // Sort value
354   var sortValue = cmd.sort;
355
356   // Handle issue of sort being an Array
357   if(Array.isArray(sortValue)) {
358     var sortObject = {};
359
360     if(sortValue.length > 0 && !Array.isArray(sortValue[0])) {
361       var sortDirection = sortValue[1];
362       // Translate the sort order text
363       if(sortDirection == 'asc') {
364         sortDirection = 1;
365       } else if(sortDirection == 'desc') {
366         sortDirection = -1;
367       }
368
369       // Set the sort order
370       sortObject[sortValue[0]] = sortDirection;
371     } else {
372       for(var i = 0; i < sortValue.length; i++) {
373         var sortDirection = sortValue[i][1];
374         // Translate the sort order text
375         if(sortDirection == 'asc') {
376           sortDirection = 1;
377         } else if(sortDirection == 'desc') {
378           sortDirection = -1;
379         }
380
381         // Set the sort order
382         sortObject[sortValue[i][0]] = sortDirection;
383       }
384     }
385
386     sortValue = sortObject;
387   };
388
389   // Add sort to command
390   if(cmd.sort) findCmd.sort = sortValue;
391   // Add a projection to the command
392   if(cmd.fields) findCmd.projection = cmd.fields;
393   // Add a hint to the command
394   if(cmd.hint) findCmd.hint = cmd.hint;
395   // Add a skip
396   if(cmd.skip) findCmd.skip = cmd.skip;
397   // Add a limit
398   if(cmd.limit) findCmd.limit = cmd.limit;
399   // Add a batchSize
400   if(typeof cmd.batchSize == 'number') findCmd.batchSize = Math.abs(cmd.batchSize);
401
402   // Check if we wish to have a singleBatch
403   if(cmd.limit < 0) {
404     findCmd.limit = Math.abs(cmd.limit);
405     findCmd.singleBatch = true;
406   }
407
408   // If we have comment set
409   if(cmd.comment) findCmd.comment = cmd.comment;
410
411   // If we have maxScan
412   if(cmd.maxScan) findCmd.maxScan = cmd.maxScan;
413
414   // If we have maxTimeMS set
415   if(cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS;
416
417   // If we have min
418   if(cmd.min) findCmd.min = cmd.min;
419
420   // If we have max
421   if(cmd.max) findCmd.max = cmd.max;
422
423   // If we have returnKey set
424   if(cmd.returnKey) findCmd.returnKey = cmd.returnKey;
425
426   // If we have showDiskLoc set
427   if(cmd.showDiskLoc) findCmd.showRecordId = cmd.showDiskLoc;
428
429   // If we have snapshot set
430   if(cmd.snapshot) findCmd.snapshot = cmd.snapshot;
431
432   // If we have tailable set
433   if(cmd.tailable) findCmd.tailable = cmd.tailable;
434
435   // If we have oplogReplay set
436   if(cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay;
437
438   // If we have noCursorTimeout set
439   if(cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout;
440
441   // If we have awaitData set
442   if(cmd.awaitData) findCmd.awaitData = cmd.awaitData;
443   if(cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;
444
445   // If we have partial set
446   if(cmd.partial) findCmd.partial = cmd.partial;
447
448   // If we have collation passed in
449   if(cmd.collation) findCmd.collation = cmd.collation;
450
451   // If we have explain, we need to rewrite the find command
452   // to wrap it in the explain command
453   if(cmd.explain) {
454     findCmd = {
455       explain: findCmd
456     }
457   }
458
459   // Did we provide a readConcern
460   if(cmd.readConcern) findCmd.readConcern = cmd.readConcern;
461
462   // Set up the serialize and ignoreUndefined fields
463   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
464     ? options.serializeFunctions : false;
465   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
466     ? options.ignoreUndefined : false;
467
468   // We have a Mongos topology, check if we need to add a readPreference
469   if(topology.type == 'mongos'
470     && readPreference
471     && readPreference.preference != 'primary') {
472     findCmd = {
473       '$query': findCmd,
474       '$readPreference': readPreference.toJSON()
475     };
476   }
477
478   // Build Query object
479   var query = new Query(bson, commandns, findCmd, {
480       numberToSkip: 0, numberToReturn: 1
481     , checkKeys: false, returnFieldSelector: null
482     , serializeFunctions: serializeFunctions, ignoreUndefined: ignoreUndefined
483   });
484
485   // Set query flags
486   query.slaveOk = readPreference.slaveOk();
487
488   // Return the query
489   return query;
490 }
491
492 //
493 // Set up a command cursor
494 var setupCommand = function(bson, ns, cmd, cursorState, topology, options) {
495   // Set empty options object
496   options = options || {}
497   // Get the readPreference
498   var readPreference = getReadPreference(cmd, options);
499
500   // Final query
501   var finalCmd = {};
502   for(var name in cmd) {
503     finalCmd[name] = cmd[name];
504   }
505
506   // Build command namespace
507   var parts = ns.split(/\./);
508
509   // Serialize functions
510   var serializeFunctions = typeof options.serializeFunctions == 'boolean'
511     ? options.serializeFunctions : false;
512
513   // Set up the serialize and ignoreUndefined fields
514   var ignoreUndefined = typeof options.ignoreUndefined == 'boolean'
515     ? options.ignoreUndefined : false;
516
517   // We have a Mongos topology, check if we need to add a readPreference
518   if(topology.type == 'mongos'
519     && readPreference
520     && readPreference.preference != 'primary') {
521     finalCmd = {
522       '$query': finalCmd,
523       '$readPreference': readPreference.toJSON()
524     };
525   }
526
527   // Build Query object
528   var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, {
529       numberToSkip: 0, numberToReturn: -1
530     , checkKeys: false, serializeFunctions: serializeFunctions
531     , ignoreUndefined: ignoreUndefined
532   });
533
534   // Set query flags
535   query.slaveOk = readPreference.slaveOk();
536
537   // Return the query
538   return query;
539 }
540
541 module.exports = WireProtocol;