3 var si = require('set-immediate-shim');
4 var stream = require('readable-stream');
5 var util = require('util');
7 var Readable = stream.Readable;
9 module.exports = ReaddirpReadable;
11 util.inherits(ReaddirpReadable, Readable);
13 function ReaddirpReadable (opts) {
14 if (!(this instanceof ReaddirpReadable)) return new ReaddirpReadable(opts);
18 opts.objectMode = true;
19 Readable.call(this, opts);
21 // backpressure not implemented at this point
22 this.highWaterMark = Infinity;
24 this._destroyed = false;
29 this._pauseResumeErrors();
32 var proto = ReaddirpReadable.prototype;
34 proto._pauseResumeErrors = function () {
36 self.on('pause', function () { self._paused = true });
37 self.on('resume', function () {
38 if (self._destroyed) return;
41 self._warnings.forEach(function (err) { self.emit('warn', err) });
42 self._warnings.length = 0;
44 self._errors.forEach(function (err) { self.emit('error', err) });
45 self._errors.length = 0;
49 // called for each entry
50 proto._processEntry = function (entry) {
51 if (this._destroyed) return;
55 proto._read = function () { }
57 proto.destroy = function () {
58 // when stream is destroyed it will emit nothing further, not even errors or warnings
60 this.readable = false;
61 this._destroyed = true;
65 proto._done = function () {
69 // we emit errors and warnings async since we may handle errors like invalid args
70 // within the initial event loop before any event listeners subscribed
71 proto._handleError = function (err) {
74 if (self._paused) return self._warnings.push(err);
75 if (!self._destroyed) self.emit('warn', err);
79 proto._handleFatalError = function (err) {
82 if (self._paused) return self._errors.push(err);
83 if (!self._destroyed) self.emit('error', err);
87 function createStreamAPI () {
88 var stream = new ReaddirpReadable();
92 , processEntry : stream._processEntry.bind(stream)
93 , done : stream._done.bind(stream)
94 , handleError : stream._handleError.bind(stream)
95 , handleFatalError : stream._handleFatalError.bind(stream)
99 module.exports = createStreamAPI;