3 var retrieveBSON = require('../connection/utils').retrieveBSON;
4 var BSON = retrieveBSON();
7 // Incrementing request id
10 // Wire command operation ids
12 var OP_GETMORE = 2005;
13 var OP_KILL_CURSORS = 2007;
16 var OPTS_TAILABLE_CURSOR = 2;
18 var OPTS_OPLOG_REPLAY = 8;
19 var OPTS_NO_CURSOR_TIMEOUT = 16;
20 var OPTS_AWAIT_DATA = 32;
21 var OPTS_EXHAUST = 64;
22 var OPTS_PARTIAL = 128;
25 var CURSOR_NOT_FOUND = 0;
26 var QUERY_FAILURE = 2;
27 var SHARD_CONFIG_STALE = 4;
28 var AWAIT_CAPABLE = 8;
30 /**************************************************************
32 **************************************************************/
33 var Query = function(bson, ns, query, options) {
35 // Basic options needed to be passed in
36 if(ns == null) throw new Error("ns must be specified for query");
37 if(query == null) throw new Error("query must be specified for query");
39 // Validate that we are not passing 0x00 in the colletion name
40 if(!!~ns.indexOf("\x00")) {
41 throw new Error("namespace cannot contain a null character");
49 // Ensure empty options
50 this.options = options || {};
53 this.numberToSkip = options.numberToSkip || 0;
54 this.numberToReturn = options.numberToReturn || 0;
55 this.returnFieldSelector = options.returnFieldSelector || null;
56 this.requestId = Query.getRequestId();
58 // Serialization option
59 this.serializeFunctions = typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false;
60 this.ignoreUndefined = typeof options.ignoreUndefined == 'boolean' ? options.ignoreUndefined : false;
61 this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
62 this.checkKeys = typeof options.checkKeys == 'boolean' ? options.checkKeys : true;
63 this.batchSize = self.numberToReturn;
66 this.tailable = false;
67 this.slaveOk = typeof options.slaveOk == 'boolean'? options.slaveOk : false;
68 this.oplogReplay = false;
69 this.noCursorTimeout = false;
70 this.awaitData = false;
76 // Assign a new request Id
77 Query.prototype.incRequestId = function() {
78 this.requestId = _requestId++;
82 // Assign a new request Id
83 Query.nextRequestId = function() {
84 return _requestId + 1;
88 // Uses a single allocated buffer for the process, avoiding multiple memory allocations
89 Query.prototype.toBin = function() {
92 var projection = null;
97 flags |= OPTS_TAILABLE_CURSOR;
104 if(this.oplogReplay) {
105 flags |= OPTS_OPLOG_REPLAY;
108 if(this.noCursorTimeout) {
109 flags |= OPTS_NO_CURSOR_TIMEOUT;
113 flags |= OPTS_AWAIT_DATA;
117 flags |= OPTS_EXHAUST;
121 flags |= OPTS_PARTIAL;
124 // If batchSize is different to self.numberToReturn
125 if(self.batchSize != self.numberToReturn) self.numberToReturn = self.batchSize;
127 // Allocate write protocol header buffer
128 var header = new Buffer(
131 + Buffer.byteLength(self.ns) + 1 // namespace
133 + 4 // numberToReturn
136 // Add header to buffers
137 buffers.push(header);
139 // Serialize the query
140 var query = self.bson.serialize(this.query, {
141 checkKeys: this.checkKeys,
142 serializeFunctions: this.serializeFunctions,
143 ignoreUndefined: this.ignoreUndefined,
146 // Add query document
149 if(self.returnFieldSelector && Object.keys(self.returnFieldSelector).length > 0) {
150 // Serialize the projection document
151 projection = self.bson.serialize(this.returnFieldSelector, {
152 checkKeys: this.checkKeys,
153 serializeFunctions: this.serializeFunctions,
154 ignoreUndefined: this.ignoreUndefined,
156 // Add projection document
157 buffers.push(projection);
160 // Total message size
161 var totalLength = header.length + query.length + (projection ? projection.length : 0);
166 // Write total document length
167 header[3] = (totalLength >> 24) & 0xff;
168 header[2] = (totalLength >> 16) & 0xff;
169 header[1] = (totalLength >> 8) & 0xff;
170 header[0] = (totalLength) & 0xff;
172 // Write header information requestId
173 header[index + 3] = (this.requestId >> 24) & 0xff;
174 header[index + 2] = (this.requestId >> 16) & 0xff;
175 header[index + 1] = (this.requestId >> 8) & 0xff;
176 header[index] = (this.requestId) & 0xff;
179 // Write header information responseTo
180 header[index + 3] = (0 >> 24) & 0xff;
181 header[index + 2] = (0 >> 16) & 0xff;
182 header[index + 1] = (0 >> 8) & 0xff;
183 header[index] = (0) & 0xff;
186 // Write header information OP_QUERY
187 header[index + 3] = (OP_QUERY >> 24) & 0xff;
188 header[index + 2] = (OP_QUERY >> 16) & 0xff;
189 header[index + 1] = (OP_QUERY >> 8) & 0xff;
190 header[index] = (OP_QUERY) & 0xff;
193 // Write header information flags
194 header[index + 3] = (flags >> 24) & 0xff;
195 header[index + 2] = (flags >> 16) & 0xff;
196 header[index + 1] = (flags >> 8) & 0xff;
197 header[index] = (flags) & 0xff;
200 // Write collection name
201 index = index + header.write(this.ns, index, 'utf8') + 1;
202 header[index - 1] = 0;
204 // Write header information flags numberToSkip
205 header[index + 3] = (this.numberToSkip >> 24) & 0xff;
206 header[index + 2] = (this.numberToSkip >> 16) & 0xff;
207 header[index + 1] = (this.numberToSkip >> 8) & 0xff;
208 header[index] = (this.numberToSkip) & 0xff;
211 // Write header information flags numberToReturn
212 header[index + 3] = (this.numberToReturn >> 24) & 0xff;
213 header[index + 2] = (this.numberToReturn >> 16) & 0xff;
214 header[index + 1] = (this.numberToReturn >> 8) & 0xff;
215 header[index] = (this.numberToReturn) & 0xff;
218 // Return the buffers
222 Query.getRequestId = function() {
226 /**************************************************************
228 **************************************************************/
229 var GetMore = function(bson, ns, cursorId, opts) {
231 this.numberToReturn = opts.numberToReturn || 0;
232 this.requestId = _requestId++;
235 this.cursorId = cursorId;
239 // Uses a single allocated buffer for the process, avoiding multiple memory allocations
240 GetMore.prototype.toBin = function() {
241 var length = 4 + Buffer.byteLength(this.ns) + 1 + 4 + 8 + (4 * 4);
242 // Create command buffer
245 var _buffer = new Buffer(length);
247 // Write header information
248 // index = write32bit(index, _buffer, length);
249 _buffer[index + 3] = (length >> 24) & 0xff;
250 _buffer[index + 2] = (length >> 16) & 0xff;
251 _buffer[index + 1] = (length >> 8) & 0xff;
252 _buffer[index] = (length) & 0xff;
255 // index = write32bit(index, _buffer, requestId);
256 _buffer[index + 3] = (this.requestId >> 24) & 0xff;
257 _buffer[index + 2] = (this.requestId >> 16) & 0xff;
258 _buffer[index + 1] = (this.requestId >> 8) & 0xff;
259 _buffer[index] = (this.requestId) & 0xff;
262 // index = write32bit(index, _buffer, 0);
263 _buffer[index + 3] = (0 >> 24) & 0xff;
264 _buffer[index + 2] = (0 >> 16) & 0xff;
265 _buffer[index + 1] = (0 >> 8) & 0xff;
266 _buffer[index] = (0) & 0xff;
269 // index = write32bit(index, _buffer, OP_GETMORE);
270 _buffer[index + 3] = (OP_GETMORE >> 24) & 0xff;
271 _buffer[index + 2] = (OP_GETMORE >> 16) & 0xff;
272 _buffer[index + 1] = (OP_GETMORE >> 8) & 0xff;
273 _buffer[index] = (OP_GETMORE) & 0xff;
276 // index = write32bit(index, _buffer, 0);
277 _buffer[index + 3] = (0 >> 24) & 0xff;
278 _buffer[index + 2] = (0 >> 16) & 0xff;
279 _buffer[index + 1] = (0 >> 8) & 0xff;
280 _buffer[index] = (0) & 0xff;
283 // Write collection name
284 index = index + _buffer.write(this.ns, index, 'utf8') + 1;
285 _buffer[index - 1] = 0;
288 // index = write32bit(index, _buffer, numberToReturn);
289 _buffer[index + 3] = (this.numberToReturn >> 24) & 0xff;
290 _buffer[index + 2] = (this.numberToReturn >> 16) & 0xff;
291 _buffer[index + 1] = (this.numberToReturn >> 8) & 0xff;
292 _buffer[index] = (this.numberToReturn) & 0xff;
296 // index = write32bit(index, _buffer, cursorId.getLowBits());
297 _buffer[index + 3] = (this.cursorId.getLowBits() >> 24) & 0xff;
298 _buffer[index + 2] = (this.cursorId.getLowBits() >> 16) & 0xff;
299 _buffer[index + 1] = (this.cursorId.getLowBits() >> 8) & 0xff;
300 _buffer[index] = (this.cursorId.getLowBits()) & 0xff;
303 // index = write32bit(index, _buffer, cursorId.getHighBits());
304 _buffer[index + 3] = (this.cursorId.getHighBits() >> 24) & 0xff;
305 _buffer[index + 2] = (this.cursorId.getHighBits() >> 16) & 0xff;
306 _buffer[index + 1] = (this.cursorId.getHighBits() >> 8) & 0xff;
307 _buffer[index] = (this.cursorId.getHighBits()) & 0xff;
314 /**************************************************************
316 **************************************************************/
317 var KillCursor = function(bson, cursorIds) {
318 this.requestId = _requestId++;
319 this.cursorIds = cursorIds;
323 // Uses a single allocated buffer for the process, avoiding multiple memory allocations
324 KillCursor.prototype.toBin = function() {
325 var length = 4 + 4 + (4 * 4) + (this.cursorIds.length * 8);
327 // Create command buffer
329 var _buffer = new Buffer(length);
331 // Write header information
332 // index = write32bit(index, _buffer, length);
333 _buffer[index + 3] = (length >> 24) & 0xff;
334 _buffer[index + 2] = (length >> 16) & 0xff;
335 _buffer[index + 1] = (length >> 8) & 0xff;
336 _buffer[index] = (length) & 0xff;
339 // index = write32bit(index, _buffer, requestId);
340 _buffer[index + 3] = (this.requestId >> 24) & 0xff;
341 _buffer[index + 2] = (this.requestId >> 16) & 0xff;
342 _buffer[index + 1] = (this.requestId >> 8) & 0xff;
343 _buffer[index] = (this.requestId) & 0xff;
346 // index = write32bit(index, _buffer, 0);
347 _buffer[index + 3] = (0 >> 24) & 0xff;
348 _buffer[index + 2] = (0 >> 16) & 0xff;
349 _buffer[index + 1] = (0 >> 8) & 0xff;
350 _buffer[index] = (0) & 0xff;
353 // index = write32bit(index, _buffer, OP_KILL_CURSORS);
354 _buffer[index + 3] = (OP_KILL_CURSORS >> 24) & 0xff;
355 _buffer[index + 2] = (OP_KILL_CURSORS >> 16) & 0xff;
356 _buffer[index + 1] = (OP_KILL_CURSORS >> 8) & 0xff;
357 _buffer[index] = (OP_KILL_CURSORS) & 0xff;
360 // index = write32bit(index, _buffer, 0);
361 _buffer[index + 3] = (0 >> 24) & 0xff;
362 _buffer[index + 2] = (0 >> 16) & 0xff;
363 _buffer[index + 1] = (0 >> 8) & 0xff;
364 _buffer[index] = (0) & 0xff;
368 // index = write32bit(index, _buffer, this.cursorIds.length);
369 _buffer[index + 3] = (this.cursorIds.length >> 24) & 0xff;
370 _buffer[index + 2] = (this.cursorIds.length >> 16) & 0xff;
371 _buffer[index + 1] = (this.cursorIds.length >> 8) & 0xff;
372 _buffer[index] = (this.cursorIds.length) & 0xff;
375 // Write all the cursor ids into the array
376 for(var i = 0; i < this.cursorIds.length; i++) {
378 // index = write32bit(index, _buffer, cursorIds[i].getLowBits());
379 _buffer[index + 3] = (this.cursorIds[i].getLowBits() >> 24) & 0xff;
380 _buffer[index + 2] = (this.cursorIds[i].getLowBits() >> 16) & 0xff;
381 _buffer[index + 1] = (this.cursorIds[i].getLowBits() >> 8) & 0xff;
382 _buffer[index] = (this.cursorIds[i].getLowBits()) & 0xff;
385 // index = write32bit(index, _buffer, cursorIds[i].getHighBits());
386 _buffer[index + 3] = (this.cursorIds[i].getHighBits() >> 24) & 0xff;
387 _buffer[index + 2] = (this.cursorIds[i].getHighBits() >> 16) & 0xff;
388 _buffer[index + 1] = (this.cursorIds[i].getHighBits() >> 8) & 0xff;
389 _buffer[index] = (this.cursorIds[i].getHighBits()) & 0xff;
397 var Response = function(bson, data, opts) {
398 opts = opts || {promoteLongs: true, promoteValues: true, promoteBuffers: false};
410 // Read the message length
411 this.length = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
412 this.index = this.index + 4;
414 // Fetch the request id for this reply
415 this.requestId = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
416 this.index = this.index + 4;
418 // Fetch the id of the request that triggered the response
419 this.responseTo = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
420 this.index = this.index + 4;
422 // Skip op-code field
423 this.index = this.index + 4;
426 this.responseFlags = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
427 this.index = this.index + 4;
430 var lowBits = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
431 this.index = this.index + 4;
432 var highBits = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
433 this.index = this.index + 4;
434 // Create long object
435 this.cursorId = new Long(lowBits, highBits);
437 // Unpack the starting from
438 this.startingFrom = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
439 this.index = this.index + 4;
441 // Unpack the number of objects returned
442 this.numberReturned = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
443 this.index = this.index + 4;
445 // Preallocate document array
446 this.documents = new Array(this.numberReturned);
449 this.cursorNotFound = (this.responseFlags & CURSOR_NOT_FOUND) != 0;
450 this.queryFailure = (this.responseFlags & QUERY_FAILURE) != 0;
451 this.shardConfigStale = (this.responseFlags & SHARD_CONFIG_STALE) != 0;
452 this.awaitCapable = (this.responseFlags & AWAIT_CAPABLE) != 0;
453 this.promoteLongs = typeof opts.promoteLongs == 'boolean' ? opts.promoteLongs : true;
454 this.promoteValues = typeof opts.promoteValues == 'boolean' ? opts.promoteValues : true;
455 this.promoteBuffers = typeof opts.promoteBuffers == 'boolean' ? opts.promoteBuffers : false;
458 Response.prototype.isParsed = function() {
462 Response.prototype.parse = function(options) {
463 // Don't parse again if not needed
464 if(this.parsed) return;
465 options = options || {};
467 // Allow the return of raw documents instead of parsing
468 var raw = options.raw || false;
469 var documentsReturnedIn = options.documentsReturnedIn || null;
470 var promoteLongs = typeof options.promoteLongs == 'boolean'
471 ? options.promoteLongs
472 : this.opts.promoteLongs;
473 var promoteValues = typeof options.promoteValues == 'boolean'
474 ? options.promoteValues
475 : this.opts.promoteValues;
476 var promoteBuffers = typeof options.promoteBuffers == 'boolean'
477 ? options.promoteBuffers
478 : this.opts.promoteBuffers
479 var bsonSize, _options;
481 // Set up the options
483 promoteLongs: promoteLongs,
484 promoteValues: promoteValues,
485 promoteBuffers: promoteBuffers
489 // Single document and documentsReturnedIn set
491 if(this.numberReturned == 1 && documentsReturnedIn != null && raw) {
492 // Calculate the bson size
493 bsonSize = this.data[this.index] | this.data[this.index + 1] << 8 | this.data[this.index + 2] << 16 | this.data[this.index + 3] << 24;
494 // Slice out the buffer containing the command result document
495 var document = this.data.slice(this.index, this.index + bsonSize);
496 // Set up field we wish to keep as raw
498 fieldsAsRaw[documentsReturnedIn] = true;
499 _options.fieldsAsRaw = fieldsAsRaw;
501 // Deserialize but keep the array of documents in non-parsed form
502 var doc = this.bson.deserialize(document, _options);
505 this.documents = doc.cursor[documentsReturnedIn];
506 this.numberReturned = this.documents.length;
507 // Ensure we have a Long valie cursor id
508 this.cursorId = typeof doc.cursor.id == 'number'
509 ? Long.fromNumber(doc.cursor.id)
513 this.index = this.index + bsonSize;
523 for(var i = 0; i < this.numberReturned; i++) {
524 bsonSize = this.data[this.index] | this.data[this.index + 1] << 8 | this.data[this.index + 2] << 16 | this.data[this.index + 3] << 24;
526 // If we have raw results specified slice the return document
528 this.documents[i] = this.data.slice(this.index, this.index + bsonSize);
530 this.documents[i] = this.bson.deserialize(this.data.slice(this.index, this.index + bsonSize), _options);
534 this.index = this.index + bsonSize;
545 , KillCursor: KillCursor