1 var shallowClone = require('../utils').shallowClone;
2 var stream = require('stream');
3 var util = require('util');
5 module.exports = GridFSBucketReadStream;
8 * A readable stream that enables you to read buffers from GridFS.
10 * Do not instantiate this class directly. Use `openDownloadStream()` instead.
13 * @param {Collection} chunks Handle for chunks collection
14 * @param {Collection} files Handle for files collection
15 * @param {Object} readPreference The read preference to use
16 * @param {Object} filter The query to use to find the file document
17 * @param {Object} [options=null] Optional settings.
18 * @param {Number} [options.sort=null] Optional sort for the file find query
19 * @param {Number} [options.skip=null] Optional skip for the file find query
20 * @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
21 * @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
22 * @fires GridFSBucketReadStream#error
23 * @fires GridFSBucketReadStream#file
24 * @return {GridFSBucketReadStream} a GridFSBucketReadStream instance.
27 function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
40 readPreference: readPreference
43 stream.Readable.call(this);
46 util.inherits(GridFSBucketReadStream, stream.Readable);
51 * @event GridFSBucketReadStream#error
56 * Fires when the stream loaded the file document corresponding to the
59 * @event GridFSBucketReadStream#file
64 * Emitted when a chunk of data is available to be consumed.
66 * @event GridFSBucketReadStream#data
71 * Fired when the stream is exhausted (no more data events).
73 * @event GridFSBucketReadStream#end
78 * Fired when the stream is exhausted and the underlying cursor is killed
80 * @event GridFSBucketReadStream#close
85 * Reads from the cursor and pushes to the stream.
89 GridFSBucketReadStream.prototype._read = function() {
95 waitForFile(_this, function() {
101 * Sets the 0-based offset in bytes to start streaming from. Throws
102 * an error if this stream has entered flowing mode
103 * (e.g. if you've already called `on('data')`)
105 * @param {Number} start Offset in bytes to start reading at
106 * @return {GridFSBucketReadStream}
109 GridFSBucketReadStream.prototype.start = function(start) {
110 throwIfInitialized(this);
111 this.s.options.start = start;
116 * Sets the 0-based offset in bytes to start streaming from. Throws
117 * an error if this stream has entered flowing mode
118 * (e.g. if you've already called `on('data')`)
120 * @param {Number} end Offset in bytes to stop reading at
121 * @return {GridFSBucketReadStream}
124 GridFSBucketReadStream.prototype.end = function(end) {
125 throwIfInitialized(this);
126 this.s.options.end = end;
131 * Marks this stream as aborted (will never push another `data` event)
132 * and kills the underlying cursor. Will emit the 'end' event, and then
133 * the 'close' event once the cursor is successfully killed.
136 * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
137 * @fires GridFSBucketWriteStream#close
138 * @fires GridFSBucketWriteStream#end
141 GridFSBucketReadStream.prototype.abort = function(callback) {
144 this.destroyed = true;
146 this.s.cursor.close(function(error) {
148 callback && callback(error);
152 // If not initialized, fire close event because we will never
156 callback && callback();
164 function throwIfInitialized(self) {
166 throw new Error('You cannot change options after the stream has entered' +
175 function doRead(_this) {
176 if (_this.destroyed) {
180 _this.s.cursor.next(function(error, doc) {
181 if (_this.destroyed) {
185 return __handleError(_this, error);
189 return _this.s.cursor.close(function(error) {
191 return __handleError(_this, error);
197 var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
198 var expectedN = _this.s.expected++;
199 var expectedLength = Math.min(_this.s.file.chunkSize,
202 if (doc.n > expectedN) {
203 var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n +
204 ', expected: ' + expectedN;
205 return __handleError(_this, new Error(errmsg));
208 if (doc.n < expectedN) {
209 var errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n +
210 ', expected: ' + expectedN;
211 return __handleError(_this, new Error(errmsg));
214 if (doc.data.length() !== expectedLength) {
215 if (bytesRemaining <= 0) {
216 var errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
217 return __handleError(_this, new Error(errmsg));
219 var errmsg = 'ChunkIsWrongSize: Got unexpected length: ' +
220 doc.data.length() + ', expected: ' + expectedLength;
221 return __handleError(_this, new Error(errmsg));
224 _this.s.bytesRead += doc.data.length();
226 if (doc.data.buffer.length === 0) {
227 return _this.push(null);
230 var sliceStart = null;
232 var buf = doc.data.buffer;
234 if (_this.s.bytesToSkip != null) {
235 sliceStart = _this.s.bytesToSkip;
236 _this.s.bytesToSkip = 0;
239 if (expectedN === _this.s.expectedEnd && _this.s.bytesToTrim != null) {
240 sliceEnd = _this.s.bytesToTrim;
243 // If the remaining amount of data left is < chunkSize read the right amount of data
244 if (_this.s.options.end && (
245 (_this.s.options.end - _this.s.bytesToSkip) < doc.data.length()
247 sliceEnd = (_this.s.options.end - _this.s.bytesToSkip);
250 if (sliceStart != null || sliceEnd != null) {
251 buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
262 function init(self) {
263 var findOneOptions = {};
264 if (self.s.readPreference) {
265 findOneOptions.readPreference = self.s.readPreference;
267 if (self.s.options && self.s.options.sort) {
268 findOneOptions.sort = self.s.options.sort;
270 if (self.s.options && self.s.options.skip) {
271 findOneOptions.skip = self.s.options.skip;
274 self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
276 return __handleError(self, error);
279 var identifier = self.s.filter._id ?
280 self.s.filter._id.toString() : self.s.filter.filename;
281 var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
282 var err = new Error(errmsg);
284 return __handleError(self, err);
287 // If document is empty, kill the stream immediately and don't
289 if (doc.length <= 0) {
294 if (self.destroyed) {
295 // If user destroys the stream before we have a cursor, wait
296 // until the query is done to say we're 'closed' because we can't
302 self.s.cursor = self.s.chunks.find({ files_id: doc._id }).sort({ n: 1 });
303 if (self.s.readPreference) {
304 self.s.cursor.setReadPreference(self.s.readPreference);
307 self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
309 self.s.bytesToSkip = handleStartOption(self, doc, self.s.cursor,
311 self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor,
313 self.emit('file', doc);
321 function waitForFile(_this, callback) {
331 _this.once('file', function() {
340 function handleStartOption(stream, doc, cursor, options) {
341 if (options && options.start != null) {
342 if (options.start > doc.length) {
343 throw new Error('Stream start (' + options.start + ') must not be ' +
344 'more than the length of the file (' + doc.length +')')
346 if (options.start < 0) {
347 throw new Error('Stream start (' + options.start + ') must not be ' +
350 if (options.end != null && options.end < options.start) {
351 throw new Error('Stream start (' + options.start + ') must not be ' +
352 'greater than stream end (' + options.end + ')');
355 cursor.skip(Math.floor(options.start / doc.chunkSize));
357 stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) *
359 stream.s.expected = Math.floor(options.start / doc.chunkSize);
361 return options.start - stream.s.bytesRead;
369 function handleEndOption(stream, doc, cursor, options) {
370 if (options && options.end != null) {
371 if (options.end > doc.length) {
372 throw new Error('Stream end (' + options.end + ') must not be ' +
373 'more than the length of the file (' + doc.length +')')
375 if (options.start < 0) {
376 throw new Error('Stream end (' + options.end + ') must not be ' +
380 var start = options.start != null ?
381 Math.floor(options.start / doc.chunkSize) :
384 cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
386 stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
388 return (Math.ceil(options.end / doc.chunkSize) * doc.chunkSize) -
397 function __handleError(_this, error) {
398 _this.emit('error', error);