6d53cc2e074d8037ae86331edc4b337a2067cb7d
[aai/esr-gui.git] /
1 'use strict';
2
3 Object.defineProperty(exports, "__esModule", {
4     value: true
5 });
6 exports.default = queue;
7
8 var _baseIndexOf = require('lodash/_baseIndexOf');
9
10 var _baseIndexOf2 = _interopRequireDefault(_baseIndexOf);
11
12 var _isArray = require('lodash/isArray');
13
14 var _isArray2 = _interopRequireDefault(_isArray);
15
16 var _noop = require('lodash/noop');
17
18 var _noop2 = _interopRequireDefault(_noop);
19
20 var _baseRest = require('lodash/_baseRest');
21
22 var _baseRest2 = _interopRequireDefault(_baseRest);
23
24 var _onlyOnce = require('./onlyOnce');
25
26 var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
27
28 var _setImmediate = require('./setImmediate');
29
30 var _setImmediate2 = _interopRequireDefault(_setImmediate);
31
32 var _DoublyLinkedList = require('./DoublyLinkedList');
33
34 var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
35
36 function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
37
38 function queue(worker, concurrency, payload) {
39     if (concurrency == null) {
40         concurrency = 1;
41     } else if (concurrency === 0) {
42         throw new Error('Concurrency must not be zero');
43     }
44
45     function _insert(data, insertAtFront, callback) {
46         if (callback != null && typeof callback !== 'function') {
47             throw new Error('task callback must be a function');
48         }
49         q.started = true;
50         if (!(0, _isArray2.default)(data)) {
51             data = [data];
52         }
53         if (data.length === 0 && q.idle()) {
54             // call drain immediately if there are no tasks
55             return (0, _setImmediate2.default)(function () {
56                 q.drain();
57             });
58         }
59
60         for (var i = 0, l = data.length; i < l; i++) {
61             var item = {
62                 data: data[i],
63                 callback: callback || _noop2.default
64             };
65
66             if (insertAtFront) {
67                 q._tasks.unshift(item);
68             } else {
69                 q._tasks.push(item);
70             }
71         }
72         (0, _setImmediate2.default)(q.process);
73     }
74
75     function _next(tasks) {
76         return (0, _baseRest2.default)(function (args) {
77             workers -= 1;
78
79             for (var i = 0, l = tasks.length; i < l; i++) {
80                 var task = tasks[i];
81                 var index = (0, _baseIndexOf2.default)(workersList, task, 0);
82                 if (index >= 0) {
83                     workersList.splice(index);
84                 }
85
86                 task.callback.apply(task, args);
87
88                 if (args[0] != null) {
89                     q.error(args[0], task.data);
90                 }
91             }
92
93             if (workers <= q.concurrency - q.buffer) {
94                 q.unsaturated();
95             }
96
97             if (q.idle()) {
98                 q.drain();
99             }
100             q.process();
101         });
102     }
103
104     var workers = 0;
105     var workersList = [];
106     var q = {
107         _tasks: new _DoublyLinkedList2.default(),
108         concurrency: concurrency,
109         payload: payload,
110         saturated: _noop2.default,
111         unsaturated: _noop2.default,
112         buffer: concurrency / 4,
113         empty: _noop2.default,
114         drain: _noop2.default,
115         error: _noop2.default,
116         started: false,
117         paused: false,
118         push: function (data, callback) {
119             _insert(data, false, callback);
120         },
121         kill: function () {
122             q.drain = _noop2.default;
123             q._tasks.empty();
124         },
125         unshift: function (data, callback) {
126             _insert(data, true, callback);
127         },
128         process: function () {
129             while (!q.paused && workers < q.concurrency && q._tasks.length) {
130                 var tasks = [],
131                     data = [];
132                 var l = q._tasks.length;
133                 if (q.payload) l = Math.min(l, q.payload);
134                 for (var i = 0; i < l; i++) {
135                     var node = q._tasks.shift();
136                     tasks.push(node);
137                     data.push(node.data);
138                 }
139
140                 if (q._tasks.length === 0) {
141                     q.empty();
142                 }
143                 workers += 1;
144                 workersList.push(tasks[0]);
145
146                 if (workers === q.concurrency) {
147                     q.saturated();
148                 }
149
150                 var cb = (0, _onlyOnce2.default)(_next(tasks));
151                 worker(data, cb);
152             }
153         },
154         length: function () {
155             return q._tasks.length;
156         },
157         running: function () {
158             return workers;
159         },
160         workersList: function () {
161             return workersList;
162         },
163         idle: function () {
164             return q._tasks.length + workers === 0;
165         },
166         pause: function () {
167             q.paused = true;
168         },
169         resume: function () {
170             if (q.paused === false) {
171                 return;
172             }
173             q.paused = false;
174             var resumeCount = Math.min(q.concurrency, q._tasks.length);
175             // Need to call q.process once per concurrent
176             // worker to preserve full concurrency after pause
177             for (var w = 1; w <= resumeCount; w++) {
178                 (0, _setImmediate2.default)(q.process);
179             }
180         }
181     };
182     return q;
183 }
184 module.exports = exports['default'];