3 var f = require('util').format
4 , Long = require('bson').Long
5 , setProperty = require('./utils').setProperty
6 , getProperty = require('./utils').getProperty
7 , getSingleProperty = require('./utils').getSingleProperty;
9 // Incrementing request id
12 // Wire command operation ids
14 var OP_GETMORE = 2005;
15 var OP_KILL_CURSORS = 2007;
19 var OPTS_TAILABLE_CURSOR = 2;
21 var OPTS_OPLOG_REPLAY = 8;
22 var OPTS_NO_CURSOR_TIMEOUT = 16;
23 var OPTS_AWAIT_DATA = 32;
24 var OPTS_EXHAUST = 64;
25 var OPTS_PARTIAL = 128;
28 var CURSOR_NOT_FOUND = 0;
29 var QUERY_FAILURE = 2;
30 var SHARD_CONFIG_STALE = 4;
31 var AWAIT_CAPABLE = 8;
33 /**************************************************************
35 **************************************************************/
36 var Query = function(bson, ns, query, options) {
38 // Basic options needed to be passed in
39 if(ns == null) throw new Error("ns must be specified for query");
40 if(query == null) throw new Error("query must be specified for query");
42 // Validate that we are not passing 0x00 in the colletion name
43 if(!!~ns.indexOf("\x00")) {
44 throw new Error("namespace cannot contain a null character");
52 // Ensure empty options
53 this.options = options || {};
56 this.numberToSkip = options.numberToSkip || 0;
57 this.numberToReturn = options.numberToReturn || 0;
58 this.returnFieldSelector = options.returnFieldSelector || null;
59 this.requestId = Query.getRequestId();
61 // Serialization option
62 this.serializeFunctions = typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false;
63 this.ignoreUndefined = typeof options.ignoreUndefined == 'boolean' ? options.ignoreUndefined : false;
64 this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
65 this.checkKeys = typeof options.checkKeys == 'boolean' ? options.checkKeys : true;
66 this.batchSize = self.numberToReturn;
69 this.tailable = false;
70 this.slaveOk = typeof options.slaveOk == 'boolean'? options.slaveOk : false;
71 this.oplogReplay = false;
72 this.noCursorTimeout = false;
73 this.awaitData = false;
79 // Assign a new request Id
80 Query.prototype.incRequestId = function() {
81 this.requestId = _requestId++;
85 // Assign a new request Id
86 Query.nextRequestId = function() {
87 return _requestId + 1;
91 // Uses a single allocated buffer for the process, avoiding multiple memory allocations
92 Query.prototype.toBin = function() {
95 var projection = null;
100 flags |= OPTS_TAILABLE_CURSOR;
107 if(this.oplogReplay) {
108 flags |= OPTS_OPLOG_REPLAY;
111 if(this.noCursorTimeout) {
112 flags |= OPTS_NO_CURSOR_TIMEOUT;
116 flags |= OPTS_AWAIT_DATA;
120 flags |= OPTS_EXHAUST;
124 flags |= OPTS_PARTIAL;
127 // If batchSize is different to self.numberToReturn
128 if(self.batchSize != self.numberToReturn) self.numberToReturn = self.batchSize;
130 // Allocate write protocol header buffer
131 var header = new Buffer(
134 + Buffer.byteLength(self.ns) + 1 // namespace
136 + 4 // numberToReturn
139 // Add header to buffers
140 buffers.push(header);
142 // Serialize the query
143 var query = self.bson.serialize(this.query
146 , this.serializeFunctions
147 , 0, this.ignoreUndefined);
149 // Add query document
152 if(self.returnFieldSelector && Object.keys(self.returnFieldSelector).length > 0) {
153 // Serialize the projection document
154 projection = self.bson.serialize(this.returnFieldSelector, this.checkKeys, true, this.serializeFunctions, this.ignoreUndefined);
155 // Add projection document
156 buffers.push(projection);
159 // Total message size
160 var totalLength = header.length + query.length + (projection ? projection.length : 0);
165 // Write total document length
166 header[3] = (totalLength >> 24) & 0xff;
167 header[2] = (totalLength >> 16) & 0xff;
168 header[1] = (totalLength >> 8) & 0xff;
169 header[0] = (totalLength) & 0xff;
171 // Write header information requestId
172 header[index + 3] = (this.requestId >> 24) & 0xff;
173 header[index + 2] = (this.requestId >> 16) & 0xff;
174 header[index + 1] = (this.requestId >> 8) & 0xff;
175 header[index] = (this.requestId) & 0xff;
178 // Write header information responseTo
179 header[index + 3] = (0 >> 24) & 0xff;
180 header[index + 2] = (0 >> 16) & 0xff;
181 header[index + 1] = (0 >> 8) & 0xff;
182 header[index] = (0) & 0xff;
185 // Write header information OP_QUERY
186 header[index + 3] = (OP_QUERY >> 24) & 0xff;
187 header[index + 2] = (OP_QUERY >> 16) & 0xff;
188 header[index + 1] = (OP_QUERY >> 8) & 0xff;
189 header[index] = (OP_QUERY) & 0xff;
192 // Write header information flags
193 header[index + 3] = (flags >> 24) & 0xff;
194 header[index + 2] = (flags >> 16) & 0xff;
195 header[index + 1] = (flags >> 8) & 0xff;
196 header[index] = (flags) & 0xff;
199 // Write collection name
200 index = index + header.write(this.ns, index, 'utf8') + 1;
201 header[index - 1] = 0;
203 // Write header information flags numberToSkip
204 header[index + 3] = (this.numberToSkip >> 24) & 0xff;
205 header[index + 2] = (this.numberToSkip >> 16) & 0xff;
206 header[index + 1] = (this.numberToSkip >> 8) & 0xff;
207 header[index] = (this.numberToSkip) & 0xff;
210 // Write header information flags numberToReturn
211 header[index + 3] = (this.numberToReturn >> 24) & 0xff;
212 header[index + 2] = (this.numberToReturn >> 16) & 0xff;
213 header[index + 1] = (this.numberToReturn >> 8) & 0xff;
214 header[index] = (this.numberToReturn) & 0xff;
217 // Return the buffers
221 Query.getRequestId = function() {
225 /**************************************************************
227 **************************************************************/
228 var GetMore = function(bson, ns, cursorId, opts) {
230 this.numberToReturn = opts.numberToReturn || 0;
231 this.requestId = _requestId++;
234 this.cursorId = cursorId;
238 // Uses a single allocated buffer for the process, avoiding multiple memory allocations
239 GetMore.prototype.toBin = function() {
240 var length = 4 + Buffer.byteLength(this.ns) + 1 + 4 + 8 + (4 * 4);
241 // Create command buffer
244 var _buffer = new Buffer(length);
246 // Write header information
247 // index = write32bit(index, _buffer, length);
248 _buffer[index + 3] = (length >> 24) & 0xff;
249 _buffer[index + 2] = (length >> 16) & 0xff;
250 _buffer[index + 1] = (length >> 8) & 0xff;
251 _buffer[index] = (length) & 0xff;
254 // index = write32bit(index, _buffer, requestId);
255 _buffer[index + 3] = (this.requestId >> 24) & 0xff;
256 _buffer[index + 2] = (this.requestId >> 16) & 0xff;
257 _buffer[index + 1] = (this.requestId >> 8) & 0xff;
258 _buffer[index] = (this.requestId) & 0xff;
261 // index = write32bit(index, _buffer, 0);
262 _buffer[index + 3] = (0 >> 24) & 0xff;
263 _buffer[index + 2] = (0 >> 16) & 0xff;
264 _buffer[index + 1] = (0 >> 8) & 0xff;
265 _buffer[index] = (0) & 0xff;
268 // index = write32bit(index, _buffer, OP_GETMORE);
269 _buffer[index + 3] = (OP_GETMORE >> 24) & 0xff;
270 _buffer[index + 2] = (OP_GETMORE >> 16) & 0xff;
271 _buffer[index + 1] = (OP_GETMORE >> 8) & 0xff;
272 _buffer[index] = (OP_GETMORE) & 0xff;
275 // index = write32bit(index, _buffer, 0);
276 _buffer[index + 3] = (0 >> 24) & 0xff;
277 _buffer[index + 2] = (0 >> 16) & 0xff;
278 _buffer[index + 1] = (0 >> 8) & 0xff;
279 _buffer[index] = (0) & 0xff;
282 // Write collection name
283 index = index + _buffer.write(this.ns, index, 'utf8') + 1;
284 _buffer[index - 1] = 0;
287 // index = write32bit(index, _buffer, numberToReturn);
288 _buffer[index + 3] = (this.numberToReturn >> 24) & 0xff;
289 _buffer[index + 2] = (this.numberToReturn >> 16) & 0xff;
290 _buffer[index + 1] = (this.numberToReturn >> 8) & 0xff;
291 _buffer[index] = (this.numberToReturn) & 0xff;
295 // index = write32bit(index, _buffer, cursorId.getLowBits());
296 _buffer[index + 3] = (this.cursorId.getLowBits() >> 24) & 0xff;
297 _buffer[index + 2] = (this.cursorId.getLowBits() >> 16) & 0xff;
298 _buffer[index + 1] = (this.cursorId.getLowBits() >> 8) & 0xff;
299 _buffer[index] = (this.cursorId.getLowBits()) & 0xff;
302 // index = write32bit(index, _buffer, cursorId.getHighBits());
303 _buffer[index + 3] = (this.cursorId.getHighBits() >> 24) & 0xff;
304 _buffer[index + 2] = (this.cursorId.getHighBits() >> 16) & 0xff;
305 _buffer[index + 1] = (this.cursorId.getHighBits() >> 8) & 0xff;
306 _buffer[index] = (this.cursorId.getHighBits()) & 0xff;
313 /**************************************************************
315 **************************************************************/
316 var KillCursor = function(bson, cursorIds) {
317 this.requestId = _requestId++;
318 this.cursorIds = cursorIds;
322 // Uses a single allocated buffer for the process, avoiding multiple memory allocations
323 KillCursor.prototype.toBin = function() {
324 var length = 4 + 4 + (4 * 4) + (this.cursorIds.length * 8);
326 // Create command buffer
328 var _buffer = new Buffer(length);
330 // Write header information
331 // index = write32bit(index, _buffer, length);
332 _buffer[index + 3] = (length >> 24) & 0xff;
333 _buffer[index + 2] = (length >> 16) & 0xff;
334 _buffer[index + 1] = (length >> 8) & 0xff;
335 _buffer[index] = (length) & 0xff;
338 // index = write32bit(index, _buffer, requestId);
339 _buffer[index + 3] = (this.requestId >> 24) & 0xff;
340 _buffer[index + 2] = (this.requestId >> 16) & 0xff;
341 _buffer[index + 1] = (this.requestId >> 8) & 0xff;
342 _buffer[index] = (this.requestId) & 0xff;
345 // index = write32bit(index, _buffer, 0);
346 _buffer[index + 3] = (0 >> 24) & 0xff;
347 _buffer[index + 2] = (0 >> 16) & 0xff;
348 _buffer[index + 1] = (0 >> 8) & 0xff;
349 _buffer[index] = (0) & 0xff;
352 // index = write32bit(index, _buffer, OP_KILL_CURSORS);
353 _buffer[index + 3] = (OP_KILL_CURSORS >> 24) & 0xff;
354 _buffer[index + 2] = (OP_KILL_CURSORS >> 16) & 0xff;
355 _buffer[index + 1] = (OP_KILL_CURSORS >> 8) & 0xff;
356 _buffer[index] = (OP_KILL_CURSORS) & 0xff;
359 // index = write32bit(index, _buffer, 0);
360 _buffer[index + 3] = (0 >> 24) & 0xff;
361 _buffer[index + 2] = (0 >> 16) & 0xff;
362 _buffer[index + 1] = (0 >> 8) & 0xff;
363 _buffer[index] = (0) & 0xff;
367 // index = write32bit(index, _buffer, this.cursorIds.length);
368 _buffer[index + 3] = (this.cursorIds.length >> 24) & 0xff;
369 _buffer[index + 2] = (this.cursorIds.length >> 16) & 0xff;
370 _buffer[index + 1] = (this.cursorIds.length >> 8) & 0xff;
371 _buffer[index] = (this.cursorIds.length) & 0xff;
374 // Write all the cursor ids into the array
375 for(var i = 0; i < this.cursorIds.length; i++) {
377 // index = write32bit(index, _buffer, cursorIds[i].getLowBits());
378 _buffer[index + 3] = (this.cursorIds[i].getLowBits() >> 24) & 0xff;
379 _buffer[index + 2] = (this.cursorIds[i].getLowBits() >> 16) & 0xff;
380 _buffer[index + 1] = (this.cursorIds[i].getLowBits() >> 8) & 0xff;
381 _buffer[index] = (this.cursorIds[i].getLowBits()) & 0xff;
384 // index = write32bit(index, _buffer, cursorIds[i].getHighBits());
385 _buffer[index + 3] = (this.cursorIds[i].getHighBits() >> 24) & 0xff;
386 _buffer[index + 2] = (this.cursorIds[i].getHighBits() >> 16) & 0xff;
387 _buffer[index + 1] = (this.cursorIds[i].getHighBits() >> 8) & 0xff;
388 _buffer[index] = (this.cursorIds[i].getHighBits()) & 0xff;
396 var Response = function(bson, data, opts) {
397 opts = opts || {promoteLongs: true, promoteValues: true, promoteBuffers: false};
409 // Read the message length
410 this.length = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
411 this.index = this.index + 4;
413 // Fetch the request id for this reply
414 this.requestId = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
415 this.index = this.index + 4;
417 // Fetch the id of the request that triggered the response
418 this.responseTo = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
419 this.index = this.index + 4;
421 // Skip op-code field
422 this.index = this.index + 4;
425 this.responseFlags = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
426 this.index = this.index + 4;
429 var lowBits = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
430 this.index = this.index + 4;
431 var highBits = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
432 this.index = this.index + 4;
433 // Create long object
434 this.cursorId = new Long(lowBits, highBits);
436 // Unpack the starting from
437 this.startingFrom = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
438 this.index = this.index + 4;
440 // Unpack the number of objects returned
441 this.numberReturned = data[this.index] | data[this.index + 1] << 8 | data[this.index + 2] << 16 | data[this.index + 3] << 24;
442 this.index = this.index + 4;
444 // Preallocate document array
445 this.documents = new Array(this.numberReturned);
448 this.cursorNotFound = (this.responseFlags & CURSOR_NOT_FOUND) != 0;
449 this.queryFailure = (this.responseFlags & QUERY_FAILURE) != 0;
450 this.shardConfigStale = (this.responseFlags & SHARD_CONFIG_STALE) != 0;
451 this.awaitCapable = (this.responseFlags & AWAIT_CAPABLE) != 0;
452 this.promoteLongs = typeof opts.promoteLongs == 'boolean' ? opts.promoteLongs : true;
453 this.promoteValues = typeof opts.promoteValues == 'boolean' ? opts.promoteValues : true;
454 this.promoteBuffers = typeof opts.promoteBuffers == 'boolean' ? opts.promoteBuffers : false;
457 Response.prototype.isParsed = function() {
461 // Validation buffers
462 var firstBatch = new Buffer('firstBatch', 'utf8');
463 var nextBatch = new Buffer('nextBatch', 'utf8');
464 var cursorId = new Buffer('id', 'utf8').toString('hex');
466 var documentBuffers = {
467 firstBatch: firstBatch.toString('hex'),
468 nextBatch: nextBatch.toString('hex')
471 Response.prototype.parse = function(options) {
472 // Don't parse again if not needed
473 if(this.parsed) return;
474 options = options || {};
476 // Allow the return of raw documents instead of parsing
477 var raw = options.raw || false;
478 var documentsReturnedIn = options.documentsReturnedIn || null;
479 var promoteLongs = typeof options.promoteLongs == 'boolean'
480 ? options.promoteLongs
481 : this.opts.promoteLongs;
482 var promoteValues = typeof options.promoteValues == 'boolean'
483 ? options.promoteValues
484 : this.opts.promoteValues;
485 var promoteBuffers = typeof options.promoteBuffers == 'boolean'
486 ? options.promoteBuffers
487 : this.opts.promoteBuffers
490 // Single document and documentsReturnedIn set
492 if(this.numberReturned == 1 && documentsReturnedIn != null && raw) {
493 // Calculate the bson size
494 var bsonSize = this.data[this.index] | this.data[this.index + 1] << 8 | this.data[this.index + 2] << 16 | this.data[this.index + 3] << 24;
495 // Slice out the buffer containing the command result document
496 var document = this.data.slice(this.index, this.index + bsonSize);
497 // Set up field we wish to keep as raw
499 fieldsAsRaw[documentsReturnedIn] = true;
500 // Set up the options
502 promoteLongs: promoteLongs,
503 promoteValues: promoteValues,
504 promoteBuffers: promoteBuffers,
505 fieldsAsRaw: fieldsAsRaw
508 // Deserialize but keep the array of documents in non-parsed form
509 var doc = this.bson.deserialize(document, _options);
512 this.documents = doc.cursor[documentsReturnedIn];
513 this.numberReturned = this.documents.length;
514 // Ensure we have a Long valie cursor id
515 this.cursorId = typeof doc.cursor.id == 'number'
516 ? Long.fromNumber(doc.cursor.id)
520 this.index = this.index + bsonSize;
530 for(var i = 0; i < this.numberReturned; i++) {
531 var bsonSize = this.data[this.index] | this.data[this.index + 1] << 8 | this.data[this.index + 2] << 16 | this.data[this.index + 3] << 24;
533 var _options = {promoteLongs: promoteLongs, promoteValues: promoteValues, promoteBuffers: promoteBuffers};
535 // If we have raw results specified slice the return document
537 this.documents[i] = this.data.slice(this.index, this.index + bsonSize);
539 this.documents[i] = this.bson.deserialize(this.data.slice(this.index, this.index + bsonSize), _options);
543 this.index = this.index + bsonSize;
554 , KillCursor: KillCursor