2 * Copyright 2014 IBM Corp.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 var should = require("should");
19 var delayNode = require("../../../../nodes/core/core/89-delay.js");
20 var helper = require("../../helper.js");
22 var GRACE_PERCENTAGE=10;
24 var nanosToSeconds = 1000000000;
25 var millisToSeconds = 1000;
27 var secondsToMinutes = 60;
28 var secondsToHours = 3600;
29 var secondsToDays = 86400;
32 describe('delayNode', function() {
34 beforeEach(function(done) {
35 helper.startServer(done);
38 afterEach(function(done) {
40 helper.stopServer(done);
43 it('should be loaded', function(done) {
44 var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[[]]}];
45 helper.load(delayNode, flow, function() {
46 var delayNode1 = helper.getNode("delayNode1");
47 delayNode1.should.have.property('name', 'delayNode');
53 MILLIS : "milliseconds",
61 * Tells whether two numeric values are close enough to each other
62 * @param actualValue - the value we're testing
63 * @param expectedValue - the value we're matching the test value against
64 * @param tolerancePercent - the percentage of tolerated deviation (0 means equals)
66 function closeEnough(actualValue, expectedValue, tolerancePercent) {
68 var toleranceFraction = expectedValue * (tolerancePercent/100);
69 var minExpected = expectedValue - toleranceFraction;
70 var maxExpected = expectedValue + toleranceFraction;
72 if(actualValue >= minExpected && actualValue <= maxExpected) {
82 * @param aTimeout - the timeout quantity
83 * @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days
85 function genericDelayTest(aTimeout, aTimeoutUnit, done) {
86 var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"delay","timeout":aTimeout,"timeoutUnits":aTimeoutUnit,"rate":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
87 {id:"helperNode1", type:"helper", wires:[]}];
88 helper.load(delayNode, flow, function() {
89 var delayNode1 = helper.getNode("delayNode1");
90 var helperNode1 = helper.getNode("helperNode1");
91 helperNode1.on("input", function(msg) {
93 var endTime = process.hrtime(startTime);
94 var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] );
95 var runtimeSeconds = runtimeNanos / nanosToSeconds;
96 var aTimeoutUnifiedToSeconds;
98 // calculating the timeout in seconds
99 if(aTimeoutUnit == TimeUnitEnum.MILLIS) {
100 aTimeoutUnifiedToSeconds = aTimeout / millisToSeconds;
101 } else if(aTimeoutUnit == TimeUnitEnum.SECONDS) {
102 aTimeoutUnifiedToSeconds = aTimeout;
103 } else if(aTimeoutUnit == TimeUnitEnum.MINUTES) {
104 aTimeoutUnifiedToSeconds = aTimeout * secondsToMinutes;
105 } else if(aTimeoutUnit == TimeUnitEnum.HOURS) {
106 aTimeoutUnifiedToSeconds = aTimeout * secondsToHours;
107 } else if(aTimeoutUnit == TimeUnitEnum.DAYS) {
108 aTimeoutUnifiedToSeconds = aTimeout * secondsToDays;
111 if(closeEnough(runtimeSeconds, aTimeoutUnifiedToSeconds, GRACE_PERCENTAGE)) {
115 should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not close enough to exlected timeout seconds: " + aTimeoutUnifiedToSeconds);
124 var startTime = process.hrtime();
125 delayNode1.receive({payload:"delayMe"});
130 * We send a message, take a timestamp then when the message is received by the helper node, we take another timestamp.
131 * Then check if the message has been delayed by the expected amount.
133 it('delays the message in seconds', function(done) {
134 genericDelayTest(0.5, "seconds", done);
137 it('delays the message in milliseconds', function(done) {
138 genericDelayTest(500, "milliseconds", done);
141 it('delays the message in minutes', function(done) { // this is also 0.5 seconds
142 genericDelayTest(0.00833, "minutes", done);
145 it('delays the message in hours', function(done) { // this is also 0.5 seconds
146 genericDelayTest(0.0001388, "hours", done);
149 it('delays the message in days', function(done) { // this is also 0.5 seconds
150 genericDelayTest(0.000005787, "days", done);
154 * Runs a rate limit test - only testing seconds!
155 * @param aLimit - the message limit count
156 * @param runtimeInMillis - when to terminate run and count messages received
158 function genericRateLimitSECONDSTest(aLimit, runtimeInMillis, done) {
159 var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
160 {id:"helperNode1", type:"helper", wires:[]}];
161 helper.load(delayNode, flow, function() {
162 var delayNode1 = helper.getNode("delayNode1");
163 var helperNode1 = helper.getNode("helperNode1");
164 var receivedMessagesStack = [];
165 var rate = 1000/aLimit;
167 var receiveTimestamp;
169 helperNode1.on("input", function(msg) {
170 if(receiveTimestamp) {
171 var elapse = process.hrtime(receiveTimestamp);
172 var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000);
173 receiveInterval.should.be.above(rate * 0.9);
175 receiveTimestamp = process.hrtime();
176 receivedMessagesStack.push(msg);
179 var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
182 for(; i < possibleMaxMessageCount + 1; i++) {
183 delayNode1.receive({payload:i});
186 setTimeout(function() {
188 receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount);
189 for(var j = 0; j < receivedMessagesStack.length; j++) {
190 if(receivedMessagesStack[j].payload === j) {
191 if(j === (receivedMessagesStack.length -1)) { // last message, all matched so far
195 should.fail(null, null, "Received messages were not received in order. Message was " + receivedMessagesStack[i].payload + " on count " + i);
205 it('limits the message rate to 1 per second', function(done) {
206 genericRateLimitSECONDSTest(1, 1500, done);
209 it('limits the message rate to 2 per second, 2 seconds', function(done) {
211 genericRateLimitSECONDSTest(2, 2100, done);
215 * Runs a rate limit test with drop support - only testing seconds!
216 * @param aLimit - the message limit count
217 * @param runtimeInMillis - when to terminate run and count messages received
219 function dropRateLimitSECONDSTest(aLimit, runtimeInMillis, done) {
220 var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":aLimit,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"wires":[["helperNode1"]]},
221 {id:"helperNode1", type:"helper", wires:[]}];
222 helper.load(delayNode, flow, function() {
223 var delayNode1 = helper.getNode("delayNode1");
224 var helperNode1 = helper.getNode("helperNode1");
225 var receivedMessagesStack = [];
227 var rate = 1000/aLimit;
229 var receiveTimestamp;
231 helperNode1.on("input", function(msg) {
232 if(receiveTimestamp) {
233 var elapse = process.hrtime(receiveTimestamp);
234 var receiveInterval = (elapse[0] * 1000) + ((elapse[1] / nanosToSeconds) * 1000);
235 receiveInterval.should.be.above(rate * 0.9);
237 receiveTimestamp = process.hrtime();
238 receivedMessagesStack.push(msg);
241 var possibleMaxMessageCount = Math.ceil(aLimit * (runtimeInMillis / 1000) + aLimit); // +aLimit as at the start of the 2nd period, we're allowing the 3rd burst
244 delayNode1.receive({payload:i});
246 for(; i < possibleMaxMessageCount + 1; i++) {
247 setTimeout(function() {
248 delayNode1.receive({payload:i});
249 }, 2 * ((rate * i) / possibleMaxMessageCount) );
252 //we need to send a message delayed so that it doesn't get dropped
253 setTimeout(function() {
254 delayNode1.receive({payload:++i});
255 }, runtimeInMillis - 300); // should give enough time to squeeze another message in
257 setTimeout(function() {
259 receivedMessagesStack.length.should.be.lessThan(possibleMaxMessageCount + 1);
260 receivedMessagesStack.length.should.be.greaterThan(2); // ensure that we receive more than 1st and last message
261 receivedMessagesStack[0].payload.should.be.exactly(0); // means we received the last message injected just before test termination
262 var foundAtLeastOneDrop = false;
263 for(var i = 0; i < receivedMessagesStack.length; i++) {
265 if(receivedMessagesStack[i].payload - receivedMessagesStack[i - 1].payload > 1) {
266 foundAtLeastOneDrop = true;
270 foundAtLeastOneDrop.should.be.true;
279 it('limits the message rate to 1 per second, 4 seconds, with drop', function(done) {
281 dropRateLimitSECONDSTest(1, 4000, done);
284 it('limits the message rate to 2 per second, 5 seconds, with drop', function(done) {
286 dropRateLimitSECONDSTest(2, 5000, done);
290 * Returns true if the actualTimeout is gracefully in between the timeoutFrom and timeoutTo
291 * values. Gracefully means that inBetween could actually mean smaller/greater values
292 * than the timeout range so long as it's within an actual grace percentage.
293 * @param timeoutFrom - The expected timeout range (low number)
294 * @param timeoutTo - The expected timeout range (high number)
295 * @param actualTimeout - The actual measured timeout value of test
296 * @param allowedGracePercent - The percentage of grace allowed
298 function inBetweenDelays(timeoutFrom, timeoutTo, actualTimeout, allowedGracePercent) {
299 if(closeEnough(actualTimeout, timeoutFrom, allowedGracePercent)) {
301 } else if(closeEnough(actualTimeout, timeoutTo, allowedGracePercent)) {
303 } else if(timeoutFrom < actualTimeout && timeoutTo > actualTimeout) {
311 * Runs a RANDOM DELAY test, checks if the delay is in between the given timeout values
312 * @param aTimeoutFrom - the timeout quantity which is the minimal acceptable wait period
313 * @param aTimeoutTo - the timeout quantity which is the maximum acceptable wait period
314 * @param aTimeoutUnit - the unit of the timeout: milliseconds, seconds, minutes, hours, days
316 function randomDelayTest(aTimeoutFrom, aTimeoutTo, aTimeoutUnit, done) {
317 var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"random","timeout":5,"timeoutUnits":"seconds","rate":"1","rateUnits":"second","randomFirst":aTimeoutFrom,"randomLast":aTimeoutTo,"randomUnits":aTimeoutUnit,"drop":false,"wires":[["helperNode1"]]},
318 {id:"helperNode1", type:"helper", wires:[]}];
319 helper.load(delayNode, flow, function() {
320 var delayNode1 = helper.getNode("delayNode1");
321 var helperNode1 = helper.getNode("helperNode1");
322 helperNode1.on("input", function(msg) {
324 var endTime = process.hrtime(startTime);
325 var runtimeNanos = ( (endTime[0] * nanosToSeconds) + endTime[1] );
326 var runtimeSeconds = runtimeNanos / nanosToSeconds;
327 var aTimeoutFromUnifiedToSeconds;
328 var aTimeoutToUnifiedToSeconds;
330 // calculating the timeout in seconds
331 if(aTimeoutUnit == TimeUnitEnum.MILLIS) {
332 aTimeoutFromUnifiedToSeconds = aTimeoutFrom / millisToSeconds;
333 aTimeoutToUnifiedToSeconds = aTimeoutTo / millisToSeconds;
334 } else if(aTimeoutUnit == TimeUnitEnum.SECONDS) {
335 aTimeoutFromUnifiedToSeconds = aTimeoutFrom;
336 aTimeoutToUnifiedToSeconds = aTimeoutTo;
337 } else if(aTimeoutUnit == TimeUnitEnum.MINUTES) {
338 aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToMinutes;
339 aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToMinutes;
340 } else if(aTimeoutUnit == TimeUnitEnum.HOURS) {
341 aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToHours;
342 aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToHours;
343 } else if(aTimeoutUnit == TimeUnitEnum.DAYS) {
344 aTimeoutFromUnifiedToSeconds = aTimeoutFrom * secondsToDays;
345 aTimeoutToUnifiedToSeconds = aTimeoutTo * secondsToDays;
348 if(inBetweenDelays(aTimeoutFromUnifiedToSeconds, aTimeoutToUnifiedToSeconds, runtimeSeconds, GRACE_PERCENTAGE)) {
352 should.fail(null, null, "Delayed runtime seconds " + runtimeSeconds + " was not \"in between enough\" enough to expected values of: " + aTimeoutFromUnifiedToSeconds + " and " + aTimeoutToUnifiedToSeconds);
361 var startTime = process.hrtime();
362 delayNode1.receive({payload:"delayMe"});
366 it('randomly delays the message in seconds', function(done) {
367 randomDelayTest(0.4, 0.8, "seconds", done);
370 it(' randomly delays the message in milliseconds', function(done) {
371 randomDelayTest(400, 800, "milliseconds", done);
374 it('randomly delays the message in minutes', function(done) {
375 randomDelayTest(0.0066, 0.0133, "minutes", done);
378 it('delays the message in hours', function(done) {
379 randomDelayTest(0.000111111, 0.000222222, "hours", done);
382 it('delays the message in days', function(done) {
383 randomDelayTest(0.0000046296, 0.0000092593, "days", done);
386 it('handles bursts using a buffer', function(done) {
389 var flow = [{"id":"delayNode1","type":"delay","name":"delayNode","pauseType":"rate","timeout":5,"timeoutUnits":"seconds","rate":1000,"rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"wires":[["helperNode1"]]},
390 {id:"helperNode1", type:"helper", wires:[]}];
391 helper.load(delayNode, flow, function() {
392 var delayNode1 = helper.getNode("delayNode1");
393 var helperNode1 = helper.getNode("helperNode1");
395 var sinon = require('sinon');
397 var receivedWarning = false;
398 var messageBurstSize = 1500;
400 // we ensure that we note that a warning is received for buffer growth
401 sinon.stub(delayNode1, 'warn', function(warning){
402 if(warning.indexOf("buffer exceeded 1000 messages" > -1)) {
403 receivedWarning = true;
407 // we ensure that the warning is received for buffer size and that we get the last message
408 helperNode1.on("input", function(msg) {
409 if(msg.payload === (messageBurstSize - 1) && receivedWarning === true) {
410 done(); // it will timeout if we don't receive the last message
413 // send 1500 messages as quickly as possible
414 for(var i = 0; i < messageBurstSize; i++) {
415 delayNode1.receive({payload:i});