494c34a84b0c260012127d9e2efa59d60fc8a5c5
[aai/esr-gui.git] /
1 var shallowClone = require('../utils').shallowClone;
2 var stream = require('stream');
3 var util = require('util');
4
5 module.exports = GridFSBucketReadStream;
6
7 /**
8  * A readable stream that enables you to read buffers from GridFS.
9  *
10  * Do not instantiate this class directly. Use `openDownloadStream()` instead.
11  *
12  * @class
13  * @param {Collection} chunks Handle for chunks collection
14  * @param {Collection} files Handle for files collection
15  * @param {Object} readPreference The read preference to use
16  * @param {Object} filter The query to use to find the file document
17  * @param {Object} [options=null] Optional settings.
18  * @param {Number} [options.sort=null] Optional sort for the file find query
19  * @param {Number} [options.skip=null] Optional skip for the file find query
20  * @param {Number} [options.start=null] Optional 0-based offset in bytes to start streaming from
21  * @param {Number} [options.end=null] Optional 0-based offset in bytes to stop streaming before
22  * @fires GridFSBucketReadStream#error
23  * @fires GridFSBucketReadStream#file
24  * @return {GridFSBucketReadStream} a GridFSBucketReadStream instance.
25  */
26
27 function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
28   var _this = this;
29   this.s = {
30     bytesRead: 0,
31     chunks: chunks,
32     cursor: null,
33     expected: 0,
34     files: files,
35     filter: filter,
36     init: false,
37     expectedEnd: 0,
38     file: null,
39     options: options,
40     readPreference: readPreference
41   };
42
43   stream.Readable.call(this);
44 }
45
46 util.inherits(GridFSBucketReadStream, stream.Readable);
47
48 /**
49  * An error occurred
50  *
51  * @event GridFSBucketReadStream#error
52  * @type {Error}
53  */
54
55 /**
56  * Fires when the stream loaded the file document corresponding to the
57  * provided id.
58  *
59  * @event GridFSBucketReadStream#file
60  * @type {object}
61  */
62
63 /**
64  * Emitted when a chunk of data is available to be consumed.
65  *
66  * @event GridFSBucketReadStream#data
67  * @type {object}
68  */
69
70 /**
71  * Fired when the stream is exhausted (no more data events).
72  *
73  * @event GridFSBucketReadStream#end
74  * @type {object}
75  */
76
77 /**
78  * Fired when the stream is exhausted and the underlying cursor is killed
79  *
80  * @event GridFSBucketReadStream#close
81  * @type {object}
82  */
83
84 /**
85  * Reads from the cursor and pushes to the stream.
86  * @method
87  */
88
89 GridFSBucketReadStream.prototype._read = function() {
90   var _this = this;
91   if (this.destroyed) {
92     return;
93   }
94
95   waitForFile(_this, function() {
96     doRead(_this);
97   });
98 };
99
100 /**
101  * Sets the 0-based offset in bytes to start streaming from. Throws
102  * an error if this stream has entered flowing mode
103  * (e.g. if you've already called `on('data')`)
104  * @method
105  * @param {Number} start Offset in bytes to start reading at
106  * @return {GridFSBucketReadStream}
107  */
108
109 GridFSBucketReadStream.prototype.start = function(start) {
110   throwIfInitialized(this);
111   this.s.options.start = start;
112   return this;
113 };
114
115 /**
116  * Sets the 0-based offset in bytes to start streaming from. Throws
117  * an error if this stream has entered flowing mode
118  * (e.g. if you've already called `on('data')`)
119  * @method
120  * @param {Number} end Offset in bytes to stop reading at
121  * @return {GridFSBucketReadStream}
122  */
123
124 GridFSBucketReadStream.prototype.end = function(end) {
125   throwIfInitialized(this);
126   this.s.options.end = end;
127   return this;
128 };
129
130 /**
131  * Marks this stream as aborted (will never push another `data` event)
132  * and kills the underlying cursor. Will emit the 'end' event, and then
133  * the 'close' event once the cursor is successfully killed.
134  *
135  * @method
136  * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
137  * @fires GridFSBucketWriteStream#close
138  * @fires GridFSBucketWriteStream#end
139  */
140
141 GridFSBucketReadStream.prototype.abort = function(callback) {
142   var _this = this;
143   this.push(null);
144   this.destroyed = true;
145   if (this.s.cursor) {
146     this.s.cursor.close(function(error) {
147       _this.emit('close');
148       callback && callback(error);
149     });
150   } else {
151     if (!this.s.init) {
152       // If not initialized, fire close event because we will never
153       // get a cursor
154       _this.emit('close');
155     }
156     callback && callback();
157   }
158 };
159
160 /**
161  * @ignore
162  */
163
164 function throwIfInitialized(self) {
165   if (self.s.init) {
166     throw new Error('You cannot change options after the stream has entered' +
167       'flowing mode!');
168   }
169 }
170
171 /**
172  * @ignore
173  */
174
175 function doRead(_this) {
176   if (_this.destroyed) {
177     return;
178   }
179
180   _this.s.cursor.next(function(error, doc) {
181     if (_this.destroyed) {
182       return;
183     }
184     if (error) {
185       return __handleError(_this, error);
186     }
187     if (!doc) {
188       _this.push(null);
189       return _this.s.cursor.close(function(error) {
190         if (error) {
191           return __handleError(_this, error);
192         }
193         _this.emit('close');
194       });
195     }
196
197     var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
198     var expectedN = _this.s.expected++;
199     var expectedLength = Math.min(_this.s.file.chunkSize,
200       bytesRemaining);
201
202     if (doc.n > expectedN) {
203       var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n +
204         ', expected: ' + expectedN;
205       return __handleError(_this, new Error(errmsg));
206     }
207
208     if (doc.n < expectedN) {
209       var errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n +
210         ', expected: ' + expectedN;
211       return __handleError(_this, new Error(errmsg));
212     }
213
214     if (doc.data.length() !== expectedLength) {
215       if (bytesRemaining <= 0) {
216         var errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
217         return __handleError(_this, new Error(errmsg));
218       }
219       var errmsg = 'ChunkIsWrongSize: Got unexpected length: ' +
220         doc.data.length() + ', expected: ' + expectedLength;
221       return __handleError(_this, new Error(errmsg));
222     }
223
224     _this.s.bytesRead += doc.data.length();
225
226     if (doc.data.buffer.length === 0) {
227       return _this.push(null);
228     }
229
230     var sliceStart = null;
231     var sliceEnd = null;
232     var buf = doc.data.buffer;
233
234     if (_this.s.bytesToSkip != null) {
235       sliceStart = _this.s.bytesToSkip;
236       _this.s.bytesToSkip = 0;
237     }
238
239     if (expectedN === _this.s.expectedEnd && _this.s.bytesToTrim != null) {
240       sliceEnd = _this.s.bytesToTrim;
241     }
242
243     // If the remaining amount of data left is < chunkSize read the right amount of data
244     if (_this.s.options.end && (
245       (_this.s.options.end - _this.s.bytesToSkip) < doc.data.length()
246     )) {
247       sliceEnd = (_this.s.options.end - _this.s.bytesToSkip);
248     }
249
250     if (sliceStart != null || sliceEnd != null) {
251       buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
252     }
253
254     _this.push(buf);
255   });
256 };
257
258 /**
259  * @ignore
260  */
261
262 function init(self) {
263   var findOneOptions = {};
264   if (self.s.readPreference) {
265     findOneOptions.readPreference = self.s.readPreference;
266   }
267   if (self.s.options && self.s.options.sort) {
268     findOneOptions.sort = self.s.options.sort;
269   }
270   if (self.s.options && self.s.options.skip) {
271     findOneOptions.skip = self.s.options.skip;
272   }
273
274   self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
275     if (error) {
276       return __handleError(self, error);
277     }
278     if (!doc) {
279       var identifier = self.s.filter._id ?
280         self.s.filter._id.toString() : self.s.filter.filename;
281       var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
282       var err = new Error(errmsg);
283       err.code = 'ENOENT';
284       return __handleError(self, err);
285     }
286
287     // If document is empty, kill the stream immediately and don't
288     // execute any reads
289     if (doc.length <= 0) {
290       self.push(null);
291       return;
292     }
293
294     if (self.destroyed) {
295       // If user destroys the stream before we have a cursor, wait
296       // until the query is done to say we're 'closed' because we can't
297       // cancel a query.
298       self.emit('close');
299       return;
300     }
301
302     self.s.cursor = self.s.chunks.find({ files_id: doc._id }).sort({ n: 1 });
303     if (self.s.readPreference) {
304       self.s.cursor.setReadPreference(self.s.readPreference);
305     }
306
307     self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
308     self.s.file = doc;
309     self.s.bytesToSkip = handleStartOption(self, doc, self.s.cursor,
310       self.s.options);
311     self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor,
312       self.s.options);
313     self.emit('file', doc);
314   });
315 }
316
317 /**
318  * @ignore
319  */
320
321 function waitForFile(_this, callback) {
322   if (_this.s.file) {
323     return callback();
324   }
325
326   if (!_this.s.init) {
327     init(_this);
328     _this.s.init = true;
329   }
330
331   _this.once('file', function() {
332     callback();
333   });
334 };
335
336 /**
337  * @ignore
338  */
339
340 function handleStartOption(stream, doc, cursor, options) {
341   if (options && options.start != null) {
342     if (options.start > doc.length) {
343       throw new Error('Stream start (' + options.start + ') must not be ' +
344         'more than the length of the file (' + doc.length +')')
345     }
346     if (options.start < 0) {
347       throw new Error('Stream start (' + options.start + ') must not be ' +
348         'negative');
349     }
350     if (options.end != null && options.end < options.start) {
351       throw new Error('Stream start (' + options.start + ') must not be ' +
352         'greater than stream end (' + options.end + ')');
353     }
354
355     cursor.skip(Math.floor(options.start / doc.chunkSize));
356
357     stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) *
358       doc.chunkSize;
359     stream.s.expected = Math.floor(options.start / doc.chunkSize);
360
361     return options.start - stream.s.bytesRead;
362   }
363 }
364
365 /**
366  * @ignore
367  */
368
369 function handleEndOption(stream, doc, cursor, options) {
370   if (options && options.end != null) {
371     if (options.end > doc.length) {
372       throw new Error('Stream end (' + options.end + ') must not be ' +
373         'more than the length of the file (' + doc.length +')')
374     }
375     if (options.start < 0) {
376       throw new Error('Stream end (' + options.end + ') must not be ' +
377         'negative');
378     }
379
380     var start = options.start != null ?
381       Math.floor(options.start / doc.chunkSize) :
382       0;
383
384     cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
385
386     stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
387
388     return (Math.ceil(options.end / doc.chunkSize) * doc.chunkSize) -
389       options.end;
390   }
391 }
392
393 /**
394  * @ignore
395  */
396
397 function __handleError(_this, error) {
398   _this.emit('error', error);
399 }