Implemented parallel execution for writeDataJob using ExecutorService with 10 concurr... 94/140494/4
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Tue, 18 Mar 2025 15:09:45 +0000 (15:09 +0000)
committersourabh_sourabh <sourabh.sourabh@est.tech>
Fri, 21 Mar 2025 13:16:11 +0000 (13:16 +0000)
Issue-ID: CPS-2692
Change-Id: I497e2e626e60b08c6cf28ffa94884808d68a1dd9
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java
integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy

index 775e9d7..3c98d69 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024-2025 Nordix Foundation.
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ public abstract class ServiceConfig {
     private String connectionProviderName = "";
     private int maximumInMemorySizeInMegabytes = 1;
     private int maximumConnectionsTotal = 1;
-    private int pendingAcquireMaxCount = 1;
+    private int pendingAcquireMaxCount = 10;
     private Integer connectionTimeoutInSeconds = 1;
     private long readTimeoutInSeconds = 1;
     private long writeTimeoutInSeconds = 1;
index c714260..de7ffab 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2025 Nordix Foundation
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the 'License');
  *  you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@ import org.onap.cps.ncmp.api.datajobs.models.DataJobWriteRequest
 import org.onap.cps.ncmp.api.datajobs.models.WriteOperation
 import org.springframework.beans.factory.annotation.Autowired
 import spock.lang.Ignore
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.Executors
 
 /**
  * This test does not depend on common performance test data. Hence it just extends the integration spec base.
@@ -37,8 +39,6 @@ class WriteDataJobPerfTest extends CpsIntegrationSpecBase {
     @Autowired
     DataJobService dataJobService
 
-    def resourceMeter = new ResourceMeter()
-
     def populateDataJobWriteRequests(int numberOfWriteOperations) {
         def writeOperations = []
         for (int i = 1; i <= numberOfWriteOperations; i++) {
@@ -52,16 +52,61 @@ class WriteDataJobPerfTest extends CpsIntegrationSpecBase {
 
     @Ignore  // CPS-2691
     def 'Performance test for writeDataJob method'() {
-        given: 'register 10_000 cm handles (with alternative ids)'
-        registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 10_000, 1, ModuleNameStrategy.UNIQUE, { it -> "/SubNetwork=Europe/SubNetwork=Ireland/MeContext=MyRadioNode${it}/ManagedElement=MyManagedElement${it}" })
+        given: 'register 10_000 cm handles (with alternate ids)'
+            registerTestCmHandles(10_000)
             def dataJobWriteRequest = populateDataJobWriteRequests(10_000)
         when: 'sending a write job to NCMP with dynamically generated write operations'
-            resourceMeter.start()
-            dataJobService.writeDataJob('', '', new DataJobMetadata('d1', '', ''), dataJobWriteRequest)
-            resourceMeter.stop()
+            def executionResult = executeWriteJob('d1', dataJobWriteRequest)
         then: 'record the result. Not asserted, just recorded in See https://lf-onap.atlassian.net/browse/CPS-2691'
-            println "*** CPS-2691 Execution time: ${resourceMeter.totalTimeInSeconds} seconds"
+            println "*** CPS-2691 Execution time: ${executionResult.executionTime} seconds | Memory usage: ${executionResult.memoryUsage} MB"
+        cleanup: 'deregister test cm handles'
+            deregisterTestCmHandles(10_000)
+    }
+
+    @Ignore  // CPS-2692
+    def 'Performance test for writeDataJob method with 10 parallel requests'() {
+        given: 'register 10_000 cm handles (with alternate ids)'
+            registerTestCmHandles(1_000)
+        when: 'sending 10 parallel write jobs to NCMP'
+            def executionResults = executeParallelWriteJobs(10, 1_000)
+        then: 'record execution times'
+            executionResults.eachWithIndex { result, index ->
+                logExecutionResults("CPS-2692 Job-${index + 1}", result)
+            }
         cleanup: 'deregister test cm handles'
-            deregisterSequenceOfCmHandles(DMI1_URL, 10_000, 1)
+            deregisterSequenceOfCmHandles(DMI1_URL, 1_000, 1)
+    }
+
+    def registerTestCmHandles(numberOfCmHandles) {
+        registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(
+                DMI1_URL, "tagA", numberOfCmHandles, 1, ModuleNameStrategy.UNIQUE,
+                { "/SubNetwork=Europe/SubNetwork=Ireland/MeContext=MyRadioNode${it}/ManagedElement=MyManagedElement${it}" }
+        )
+    }
+
+    def executeParallelWriteJobs(numberOfJobs, numberOfWriteOperations) {
+        def executorService = Executors.newFixedThreadPool(numberOfJobs)
+        def futures = (0..<numberOfJobs).collect { jobId ->
+            CompletableFuture.supplyAsync({ -> executeWriteJob(jobId, populateDataJobWriteRequests(numberOfWriteOperations)) }, executorService)
+        }
+        def executionResults = futures.collect { it.join() }
+        executorService.shutdown()
+        return executionResults
+    }
+
+    def executeWriteJob(jobId, dataJobWriteRequest) {
+        def localMeter = new ResourceMeter()
+        localMeter.start()
+        dataJobService.writeDataJob('', '', new DataJobMetadata("job-${jobId}", '', ''), dataJobWriteRequest)
+        localMeter.stop()
+        ['executionTime': localMeter.totalTimeInSeconds, 'memoryUsage': localMeter.totalMemoryUsageInMB]
+    }
+
+    def logExecutionResults(jobId, result) {
+        println "*** ${jobId} Execution time: ${result.executionTime} seconds | Memory usage: ${result.memoryUsage} MB"
+    }
+
+    def deregisterTestCmHandles(numberOfCmHandles) {
+        deregisterSequenceOfCmHandles(DMI1_URL, numberOfCmHandles, 1)
     }
 }