Fix K6 KPI Legacy Batch Reporting issues II 25/141325/3
authorToineSiebelink <toine.siebelink@est.tech>
Tue, 17 Jun 2025 08:09:22 +0000 (09:09 +0100)
committerToine Siebelink <toine.siebelink@est.tech>
Mon, 23 Jun 2025 07:25:20 +0000 (07:25 +0000)
- Changed legacy batch consume to ran once per second (constant-arrival-rate) rather then only 1 big loop
- Updated k6 install to version 1.0.0 ! (old version cause most of the issue this patch took so long)
- Removed import of crypto as it is now part of k6 standard install
- No more checks on total messages, just recording message consumed every second
- Fixed 'timeout' configuration parameter, it is 'maxWait' !
- Fixed some incorrect casing on variables
- Added (now debug level) logging for troubleshooting produce/consume legacy batch events
- Improved name of config parameter: containerUpTime to containerCoolDownTime
- Reduce cool down time for kpi to 10s (its really only needed for endurance)
- Improved several variable names (consistency!)
- Reduced scope of data be passed on (to what is used) e.g. "data.metrics" instead of "data"
- Some formatting changes
TODO remove small test config

Issue-ID: CPS-2716
Change-Id: If95645f3eb90ff035a1add07b1e2117b7f944317
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
14 files changed:
.gitignore
docker-compose/config/grafana/jvm-micrometer-dashboard.json
docker-compose/cps-base.yml
k6-tests/install-deps.sh
k6-tests/ncmp/common/passthrough-crud.js
k6-tests/ncmp/common/produce-avc-event.js
k6-tests/ncmp/common/utils.js
k6-tests/ncmp/common/write-data-job.js
k6-tests/ncmp/config/endurance.json
k6-tests/ncmp/config/kpi.json
k6-tests/ncmp/config/test-kpi-metadata.json
k6-tests/ncmp/ncmp-test-runner.js
k6-tests/ncmp/register-cmhandles-only.js
k6-tests/teardown.sh

index f7e2bf7..f0f1d15 100755 (executable)
@@ -41,3 +41,5 @@ tmp/
 
 csit/env.properties
 csit/archives/
+
+/k6-tests/image/
\ No newline at end of file
index 6bdc41b..f3fe4f9 100644 (file)
@@ -266,4 +266,4 @@ volumes:
   grafana:
     driver: local
   prometheus_data:
-    driver: local
\ No newline at end of file
+    driver: local
index 393a255..2c4243e 100755 (executable)
@@ -37,11 +37,12 @@ docker-compose version
 
 # Download k6 with kafka extension.
 if [ ! -x bin/k6 ]; then
-  echo " Downloading k6 with kafka extension"
-  curl -s -L https://github.com/mostafa/xk6-kafka/releases/download/v0.26.0/xk6-kafka_v0.26.0_linux_amd64.tar.gz | tar -xz
-  mv dist/xk6-kafka_v0.26.0_linux_amd64 bin/k6 && rmdir dist
+  echo " Installing k6 1.0.0 with kafka extension"
+  curl -s -L https://github.com/mostafa/xk6-kafka/releases/download/v1.0.0/xk6-kafka_v1.0.0_linux_amd64.tar.gz | tar -xz
+  mv dist/xk6-kafka_v1.0.0_linux_amd64 bin/k6 && rmdir dist
   chmod +x bin/k6
 else
   echo " k6 already installed"
 fi
+echo " Checking k6 Version:"
 k6 --version
index e502e5d..ecf252a 100644 (file)
@@ -69,4 +69,4 @@ function generatePassthroughUrl(alternateId, datastoreName, resourceIdentifier,
     const encodedAlternateId = encodeURIComponent(alternateId);
     const descendantsParam = includeDescendants ? `&include-descendants=${includeDescendants}` : '';
     return `${NCMP_BASE_URL}/ncmp/v1/ch/${encodedAlternateId}/data/ds/${datastoreName}?resourceIdentifier=${resourceIdentifier}${descendantsParam}`;
-}
\ No newline at end of file
+}
index 4f25b46..4ddbc43 100644 (file)
@@ -18,7 +18,6 @@
  *  ============LICENSE_END=========================================================
  */
 
-import {crypto} from 'k6/experimental/webcrypto';
 import {check} from 'k6';
 import {Writer, SchemaRegistry, SCHEMA_TYPE_STRING} from 'k6/x/kafka';
 
index 9783377..bb4e5ae 100644 (file)
@@ -23,19 +23,20 @@ import http from 'k6/http';
 import {check} from 'k6';
 import {Trend} from 'k6/metrics';
 
-export const testConfig = JSON.parse(open(`../config/${__ENV.TEST_PROFILE}.json`));
+export const TEST_PROFILE = __ENV.TEST_PROFILE ? __ENV.TEST_PROFILE : 'kpi'
+export const testConfig = JSON.parse(open(`../config/${TEST_PROFILE}.json`));
 export const testKpiMetaData = JSON.parse(open(`../config/test-kpi-metadata.json`));
 export const KAFKA_BOOTSTRAP_SERVERS = testConfig.hosts.kafkaBootstrapServer;
 export const NCMP_BASE_URL = testConfig.hosts.ncmpBaseUrl;
 export const DMI_PLUGIN_URL = testConfig.hosts.dmiStubUrl;
-export const CONTAINER_UP_TIME_IN_SECONDS = testConfig.hosts.containerUpTimeInSeconds;
+export const CONTAINER_COOL_DOWW_TIME_IN_SECONDS = testConfig.hosts.containerCoolDownTimeInSeconds;
 export const LEGACY_BATCH_TOPIC_NAME = 'legacy_batch_topic';
 export const TOTAL_CM_HANDLES = __ENV.TOTAL_CM_HANDLES ? parseInt(__ENV.TOTAL_CM_HANDLES) : 50000;
 export const REGISTRATION_BATCH_SIZE = 2000;
 export const READ_DATA_FOR_CM_HANDLE_DELAY_MS = 300; // must have same value as in docker-compose.yml
 export const WRITE_DATA_FOR_CM_HANDLE_DELAY_MS = 670; // must have same value as in docker-compose.yml
 export const CONTENT_TYPE_JSON_PARAM = {'Content-Type': 'application/json'};
-export const LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE = 200;
+export const LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE = 200; // Note: a maximum batch size of 200 implemented in production code!
 export const MODULE_SET_TAGS = ['tagA', 'tagB', 'tagC', 'tagD', 'tagE'];
 
 /**
@@ -119,7 +120,7 @@ export function performGetRequest(url, metricTag) {
     return http.get(url, {tags: metricTags});
 }
 
-export function makeCustomSummaryReport(testResults, scenarioConfig) {
+export function makeCustomSummaryReport(metrics, thresholds) {
     const summaryCsvLines = [
         '#,Test Name,Unit,Fs Requirement,Current Expectation,Actual',
         ...testKpiMetaData.map(test => {
@@ -129,20 +130,19 @@ export function makeCustomSummaryReport(testResults, scenarioConfig) {
                 test.unit,
                 test.metric,
                 test.cpsAverage,
-                testResults,
-                scenarioConfig
+                metrics,
+                thresholds
             );
         })
     ];
     return summaryCsvLines.join('\n') + '\n';
 }
 
-function makeSummaryCsvLine(testCase, testName, unit, measurementName, currentExpectation, testResults, scenarioConfig) {
-    const thresholdArray = JSON.parse(JSON.stringify(scenarioConfig.thresholds[measurementName]));
-    const thresholdString = thresholdArray[0];
-    const [thresholdKey, thresholdOperator, thresholdValue] = thresholdString.split(/\s+/);
-    const actualValue = testResults.metrics[measurementName].values[thresholdKey].toFixed(3);
-    return `${testCase},${testName},${unit},${thresholdValue},${currentExpectation},${actualValue}`;
+function makeSummaryCsvLine(testNumber, testName, unit, measurementName, currentExpectation, metrics, thresholds) {
+    const thresholdCondition = JSON.parse(JSON.stringify(thresholds[measurementName]))[0];
+    const [metricsFunction, thresholdOperator, thresholdValue] = thresholdCondition.split(/\s+/);
+    const actualValue = metrics[measurementName].values[metricsFunction].toFixed(3);
+    return `${testNumber},${testName},${unit},${thresholdValue},${currentExpectation},${actualValue}`;
 }
 
 /**
@@ -152,10 +152,10 @@ function makeSummaryCsvLine(testCase, testName, unit, measurementName, currentEx
  * @param {number} expectedStatusCode
  * @param {string} checkLabel
  * @param {number} delayInMs - Overhead to subtract
- * @param {Trend} trendMetric
+ * @param {Trend} testTrend
  */
-export function validateResponseAndRecordMetricWithOverhead(httpResponse, expectedStatusCode, checkLabel, delayInMs, trendMetric) {
-    recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabel, trendMetric,
+export function validateResponseAndRecordMetricWithOverhead(httpResponse, expectedStatusCode, checkLabel, delayInMs, testTrend) {
+    recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabel, testTrend,
         (httpResponse) => httpResponse.timings.duration - delayInMs
     );
 }
@@ -167,10 +167,10 @@ export function validateResponseAndRecordMetricWithOverhead(httpResponse, expect
  * @param {number} expectedStatusCode
  * @param {string} checkLabel
  * @param {number} expectedArrayLength
- * @param {Trend} trendMetric
+ * @param {Trend} testTrend
  */
-export function validateResponseAndRecordMetric(httpResponse, expectedStatusCode, checkLabel, expectedArrayLength, trendMetric) {
-    recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabel, trendMetric, (response) => response.timings.duration, (response) => {
+export function validateResponseAndRecordMetric(httpResponse, expectedStatusCode, checkLabel, expectedArrayLength, testTrend) {
+    recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabel, testTrend, (response) => response.timings.duration, (response) => {
         const status = response.status;
         const body = typeof response.body === 'string' ? response.body.trim() : '';
         if (!body) {
@@ -189,7 +189,7 @@ export function validateResponseAndRecordMetric(httpResponse, expectedStatusCode
     });
 }
 
-function recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabel, metricRecorder, metricValueExtractor, customValidatorFn = () => ({
+function recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabel, testTrend, metricValueExtractor, customValidatorFn = () => ({
     valid: true,
     reason: undefined
 })) {
@@ -201,7 +201,7 @@ function recordMetricIfResponseValid(httpResponse, expectedStatusCode, checkLabe
     });
 
     if (isSuccess) {
-        metricRecorder.add(metricValueExtractor(httpResponse));
+        testTrend.add(metricValueExtractor(httpResponse));
     } else {
         logDetailedFailure(httpResponse, isExpectedStatusMatches, checkLabel, reason);
     }
@@ -247,4 +247,4 @@ function logDetailedFailure(httpResponse, isExpectedStatusMatches, checkLabel, c
         const bodyPreview = trimmedBody.length > 500 ? trimmedBody.slice(0, 500) + '... [truncated]' : trimmedBody;
         console.error(`❌ ${checkLabel}: ${errorCategory}. Status: ${status}, URL: ${url}. Response is not valid JSON.\n↪️ Raw body preview:\n${bodyPreview}\n✳️ Parse error: ${e.message}`);
     }
-}
\ No newline at end of file
+}
index 9da9206..40dc079 100644 (file)
@@ -18,7 +18,6 @@
  *  ============LICENSE_END=========================================================
  */
 
-import {crypto} from 'k6/experimental/webcrypto';
 import {performPostRequest, getRandomAlternateId, NCMP_BASE_URL} from './utils.js';
 
 /**
index d9bdccb..22f502e 100644 (file)
@@ -3,7 +3,7 @@
     "ncmpBaseUrl": "http://localhost:8884",
     "dmiStubUrl": "http://ncmp-dmi-plugin-demo-and-csit-stub:8092",
     "kafkaBootstrapServer": "localhost:9093",
-    "containerUpTimeInSeconds": 420
+    "containerCoolDownTimeInSeconds": 420
   },
   "scenarios": {
     "passthrough_read_alt_id_scenario": {
index eee8398..5f10053 100644 (file)
@@ -3,7 +3,7 @@
     "ncmpBaseUrl": "http://localhost:8883",
     "dmiStubUrl": "http://ncmp-dmi-plugin-demo-and-csit-stub:8092",
     "kafkaBootstrapServer": "localhost:9092",
-    "containerUpTimeInSeconds": 300
+    "containerCoolDownTimeInSeconds": 10
   },
   "scenarios": {
     "passthrough_read_alt_id_scenario": {
       "rate": 1,
       "preAllocatedVUs": 1,
       "timeUnit": "1s",
-      "duration": "15m10s",
-      "maxVUs": 2,
+      "duration": "15m5s",
+      "maxVUs": 1,
       "startTime": "71ms"
     },
     "legacy_batch_consume_scenario": {
-      "executor": "per-vu-iterations",
+      "executor": "constant-arrival-rate",
       "exec": "legacyBatchConsumeScenario",
-      "vus": 1,
-      "iterations": 1,
-      "maxDuration": "16m",
-      "startTime": "71ms"
+      "rate": 1,
+      "timeUnit": "1s",
+      "duration": "15m",
+      "preAllocatedVUs": 1,
+      "maxVUs": 1,
+      "startTime": "571ms"
     },
     "produceCmAvcBackGroundLoadAtPeakRate": {
       "executor": "constant-arrival-rate",
   },
   "thresholds": "#SCENARIO-THRESHOLDS#"
 }
+
index 17e3ec3..cfbc6ea 100644 (file)
     "cpsAverage": 8000,
     "kpiThreshold": 30000
   }
-]
\ No newline at end of file
+]
index 880e541..93bc320 100644 (file)
 import { check, sleep } from 'k6';
 import { Trend } from 'k6/metrics';
 import { Reader } from 'k6/x/kafka';
-import { testConfig, validateResponseAndRecordMetricWithOverhead,
-    validateResponseAndRecordMetric, makeCustomSummaryReport, makeBatchOfCmHandleIds, makeRandomBatchOfAlternateIds,
-    TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
-    LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE, REGISTRATION_BATCH_SIZE,
-    KAFKA_BOOTSTRAP_SERVERS, LEGACY_BATCH_TOPIC_NAME, CONTAINER_UP_TIME_IN_SECONDS
+import { testConfig,
+    validateResponseAndRecordMetricWithOverhead,
+    validateResponseAndRecordMetric,
+    makeCustomSummaryReport,
+    makeBatchOfCmHandleIds,
+    makeRandomBatchOfAlternateIds,
+    TOTAL_CM_HANDLES,
+    READ_DATA_FOR_CM_HANDLE_DELAY_MS,
+    WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
+    LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE,
+    REGISTRATION_BATCH_SIZE,
+    KAFKA_BOOTSTRAP_SERVERS,
+    LEGACY_BATCH_TOPIC_NAME,
+    CONTAINER_COOL_DOWW_TIME_IN_SECONDS
 } from './common/utils.js';
 import { createCmHandles, deleteCmHandles, waitForAllCmHandlesToBeReady } from './common/cmhandle-crud.js';
 import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
@@ -40,6 +49,7 @@ const EXPECTED_WRITE_RESPONSE_COUNT = 1;
 export const legacyBatchEventReader = new Reader({
     brokers: [KAFKA_BOOTSTRAP_SERVERS],
     topic: LEGACY_BATCH_TOPIC_NAME,
+    maxWait: '500ms', // Do not increase otherwise the read won't finish within 1 second (it is set to run every second)
 });
 
 export const options = {
@@ -52,8 +62,8 @@ export const options = {
 export function setup() {
     const startTimeInMillis = Date.now();
 
-    const TOTAL_BATCHES = Math.ceil(TOTAL_CM_HANDLES / REGISTRATION_BATCH_SIZE);
-    for (let batchNumber = 0; batchNumber < TOTAL_BATCHES; batchNumber++) {
+    const numberOfBatches = Math.ceil(TOTAL_CM_HANDLES / REGISTRATION_BATCH_SIZE);
+    for (let batchNumber = 0; batchNumber < numberOfBatches; batchNumber++) {
         const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(REGISTRATION_BATCH_SIZE, batchNumber);
         const response = createCmHandles(nextBatchOfCmHandleIds);
         check(response, { 'create CM-handles status equals 200': (response) => response.status === 200 });
@@ -70,13 +80,13 @@ export function setup() {
 export function teardown() {
     const startTimeInMillis = Date.now();
 
-    let DEREGISTERED_CM_HANDLES = 0
-    const TOTAL_BATCHES = Math.ceil(TOTAL_CM_HANDLES / REGISTRATION_BATCH_SIZE);
-    for (let batchNumber = 0; batchNumber < TOTAL_BATCHES; batchNumber++) {
+    let numberOfDeregisteredCmHandles = 0
+    const numberOfBatches = Math.ceil(TOTAL_CM_HANDLES / REGISTRATION_BATCH_SIZE);
+    for (let batchNumber = 0; batchNumber < numberOfBatches; batchNumber++) {
         const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(REGISTRATION_BATCH_SIZE, batchNumber);
         const response = deleteCmHandles(nextBatchOfCmHandleIds);
         if (response.error_code === 0) {
-              DEREGISTERED_CM_HANDLES += REGISTRATION_BATCH_SIZE
+              numberOfDeregisteredCmHandles += REGISTRATION_BATCH_SIZE
         }
         check(response, { 'delete CM-handles status equals 200': (response) => response.status === 200 });
     }
@@ -84,9 +94,9 @@ export function teardown() {
     const endTimeInMillis = Date.now();
     const totalDeregistrationTimeInSeconds = (endTimeInMillis - startTimeInMillis) / 1000.0;
 
-    cmHandlesDeletedTrend.add(DEREGISTERED_CM_HANDLES / totalDeregistrationTimeInSeconds);
+    cmHandlesDeletedTrend.add(numberOfDeregisteredCmHandles / totalDeregistrationTimeInSeconds);
 
-    sleep(CONTAINER_UP_TIME_IN_SECONDS);
+    sleep(CONTAINER_COOL_DOWW_TIME_IN_SECONDS);
 }
 
 export function passthroughReadAltIdScenario() {
@@ -150,9 +160,25 @@ export function cmHandleSearchTrustLevelScenario() {
 }
 
 export function legacyBatchProduceScenario() {
+    const timestamp1 = (new Date()).toISOString();
     const nextBatchOfAlternateIds = makeRandomBatchOfAlternateIds();
     const response = legacyBatchRead(nextBatchOfAlternateIds);
     check(response, {'data operation batch read status equals 200': (response) => response.status === 200});
+    const timestamp2 = (new Date()).toISOString();
+    console.debug(`✅ From ${timestamp1} to ${timestamp2} produced ${LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE} messages for legacy batch read`);
+}
+
+export function legacyBatchConsumeScenario() {
+    const timestamp1 = (new Date()).toISOString();
+    try {
+        const messages = legacyBatchEventReader.consume({ limit: 220, expectTimeout: true });
+        const timestamp2 = (new Date()).toISOString();
+        console.debug(`✅ From ${timestamp1} to ${timestamp2} consumed ${messages.length} messages by legacy batch read\``);
+        legacyBatchReadTrend.add(messages.length);
+    } catch (error) {
+        const timestamp2 = (new Date()).toISOString();
+        console.error(`❌ From ${timestamp1} to ${timestamp2} Consume error (legacy batch read): ${error.message}`);
+    }
 }
 
 export function writeDataJobLargeScenario() {
@@ -169,55 +195,12 @@ export function produceAvcEventsScenario() {
     sendBatchOfKafkaMessages(500);
 }
 
-export function legacyBatchConsumeScenario() {
-    // calculate total messages 15 minutes times 60 seconds times
-    const TOTAL_MESSAGES_TO_CONSUME = 15 * 60 * LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE;
-    console.log("📥 [legacy batch consume scenario] Starting consumption of", TOTAL_MESSAGES_TO_CONSUME, "messages...");
-
-    try {
-        let messagesConsumed = 0;
-        const startTime = Date.now();
-
-        while (messagesConsumed < TOTAL_MESSAGES_TO_CONSUME) {
-            try {
-                const messages = legacyBatchEventReader.consume({
-                    limit: LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE,
-                    timeout: 30000,
-                });
-
-                if (messages.length > 0) {
-                    messagesConsumed += messages.length;
-                    console.debug(`✅ Consumed ${messages.length} messages by legacy batch read (total: ${messagesConsumed}/${TOTAL_MESSAGES_TO_CONSUME})`);
-                } else {
-                    console.warn("⚠️ No messages received by legacy batch read.");
-                }
-            } catch (err) {
-                console.error(`❌ Consume error (legacy batch read): ${err.message}`);
-            }
-        }
-
-        const endTime = Date.now();
-        const timeToConsumeMessagesInSeconds = (endTime - startTime) / 1000.0;
-
-        if (messagesConsumed > 0) {
-            legacyBatchReadTrend.add(messagesConsumed / timeToConsumeMessagesInSeconds);
-            console.log(`🏁 Finished (legacy batch read): Consumed ${messagesConsumed} messages in ${timeToConsumeMessagesInSeconds.toFixed(2)}s.`);
-        } else {
-            legacyBatchReadCmhandlesPerSecondTrend.add(0);
-            console.error("⚠️ No messages consumed by legacy read batch.");
-        }
-    } catch (error) {
-        legacyBatchReadTrend.add(0);
-        console.error("💥 Legacy batch read scenario failed:", error.message);
-    }
-}
-
 export function handleSummary(data) {
     const testProfile = __ENV.TEST_PROFILE;
     if (testProfile === 'kpi') {
         console.log("✅ Generating KPI summary...");
         return {
-            stdout: makeCustomSummaryReport(data, options),
+            stdout: makeCustomSummaryReport(data.metrics, options.thresholds),
         };
     }
     console.log("⛔ Skipping KPI summary (not in 'kpi' profile)");
index 18c2f85..28c2202 100644 (file)
@@ -34,8 +34,8 @@ import { createCmHandles, waitForAllCmHandlesToBeReady } from './common/cmhandle
  * The number of handles to be registered is TOTAL_CM_HANDLES defined in common/utils.js
  */
 export default function () {
-    const TOTAL_BATCHES = Math.ceil(TOTAL_CM_HANDLES / REGISTRATION_BATCH_SIZE);
-    for (let batchNumber = 0; batchNumber < TOTAL_BATCHES; batchNumber++) {
+    const numberOfBatches = Math.ceil(TOTAL_CM_HANDLES / REGISTRATION_BATCH_SIZE);
+    for (let batchNumber = 0; batchNumber < numberOfBatches; batchNumber++) {
         const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(REGISTRATION_BATCH_SIZE, batchNumber);
         const response = createCmHandles(nextBatchOfCmHandleIds);
         check(response, { 'create CM-handles status equals 200': (r) => r.status === 200 });
index bbe6c24..0f89652 100755 (executable)
@@ -49,10 +49,11 @@ remove_all_onap_docker_images() {
 # Set an environment variable CLEAN_DOCKER_IMAGES=1 to also remove docker images when done (used on jenkins job)
 echo "Stopping, Removing containers and volumes for $testProfile tests..."
 if [[ "${CLEAN_DOCKER_IMAGES:-0}" -eq 1 ]]; then
+  echo "Also cleaning up all images"
   eval "$docker_compose_shutdown_cmd --rmi all"
   if [[ "$testProfile" == "endurance" ]]; then
     remove_all_onap_docker_images
   fi
 else
   eval "$docker_compose_shutdown_cmd"
-fi
\ No newline at end of file
+fi