CmHandle registration/module sync watchdog performance improvment 05/130805/6
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Thu, 8 Sep 2022 11:35:04 +0000 (12:35 +0100)
committerSourabh Sourabh <sourabh.sourabh@est.tech>
Tue, 13 Sep 2022 15:06:23 +0000 (15:06 +0000)
- Tuned CPS DB parameters
- Removed deprecated CPS DB parameter
- Modified module sync watch logic
- Added aditional logs
- Removed logic to get FragmentEntities as stream
- Removed detaching of fragment entity as it was causing NCMP to break with an exception "SQLSTATE(08006)"

Issue-ID: CPS-1126
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: I0bde11895f754602dece132efde701d82b377f12

cps-application/src/main/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
cps-ri/src/main/java/org/onap/cps/spi/repository/FragmentRepositoryCpsPathQueryImpl.java

index 9b6f41e..56515df 100644 (file)
@@ -45,13 +45,12 @@ spring:
         username: ${DB_USERNAME}\r
         password: ${DB_PASSWORD}\r
         driverClassName: org.postgresql.Driver\r
-        initialization-mode: always\r
         hikari:\r
             minimumIdle: 5\r
             maximumPoolSize: 80\r
-            idleTimeout: 120000\r
-            connectionTimeout: 300000\r
-            leakDetectionThreshold: 300000\r
+            idleTimeout: 60000\r
+            connectionTimeout: 120000\r
+            leakDetectionThreshold: 2000\r
             pool-name: CpsDatabasePool\r
 \r
     cache:\r
@@ -91,6 +90,9 @@ spring:
       default-property-inclusion: NON_NULL\r
       serialization:\r
         FAIL_ON_EMPTY_BEANS: false\r
+    sql:\r
+      init:\r
+        mode: ALWAYS\r
 app:\r
     ncmp:\r
         async-m2m:\r
index d457f26..d5b459b 100644 (file)
@@ -56,7 +56,7 @@ public class DmiRestClient {
         } catch (final HttpStatusCodeException httpStatusCodeException) {
             final String exceptionMessage = "Unable to " + operation.toString() + " resource data.";
             throw new HttpClientRequestException(exceptionMessage, httpStatusCodeException.getResponseBodyAsString(),
-                httpStatusCodeException.getRawStatusCode());
+                    httpStatusCodeException.getRawStatusCode());
         }
     }
 
index ada3dc6..f914547 100644 (file)
@@ -71,6 +71,7 @@ public class ModuleSyncTasks {
                     moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
                     cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
                 } catch (final Exception e) {
+                    log.warn("Processing module sync batch failed.");
                     syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
                             LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
                     setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
@@ -81,6 +82,7 @@ public class ModuleSyncTasks {
             lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandelStatePerCmHandle);
         } finally {
             batchCounter.getAndDecrement();
+            log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
         }
         return COMPLETED_FUTURE;
     }
index 73954c3..64d111f 100644 (file)
@@ -62,16 +62,22 @@ public class ModuleSyncWatchdog {
      */
     @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
     public void moduleSyncAdvisedCmHandles() {
+        log.info("Processing module sync watchdog waking up.");
         populateWorkQueueIfNeeded();
         final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel();
-        while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) {
-            batchCounter.getAndIncrement();
-            final Collection<DataNode> nextBatch = prepareNextBatch();
-            asyncTaskExecutor.executeTask(() ->
-                            moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
-                    ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
-            );
-            preventBusyWait();
+        while (!moduleSyncWorkQueue.isEmpty()) {
+            if (batchCounter.get() <= asyncTaskParallelismLevel) {
+                final Collection<DataNode> nextBatch = prepareNextBatch();
+                log.debug("Processing module sync batch of {}. {} batch(es) active.",
+                        nextBatch.size(), batchCounter.get());
+                asyncTaskExecutor.executeTask(() ->
+                                moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
+                        ASYNC_TASK_TIMEOUT_IN_MILLISECONDS
+                );
+                batchCounter.getAndIncrement();
+            } else {
+                preventBusyWait();
+            }
         }
     }
 
@@ -80,6 +86,7 @@ public class ModuleSyncWatchdog {
      */
     @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
     public void resetPreviouslyFailedCmHandles() {
+        log.info("Processing module sync retry-watchdog waking up.");
         final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
         moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
     }
@@ -95,6 +102,7 @@ public class ModuleSyncWatchdog {
     private void populateWorkQueueIfNeeded() {
         if (moduleSyncWorkQueue.isEmpty()) {
             final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles();
+            log.info("Processing module sync fetched {} advised cm handles from DB", advisedCmHandles.size());
             for (final DataNode advisedCmHandle : advisedCmHandles) {
                 if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
                     log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
index e5240c0..dd989bf 100644 (file)
@@ -52,20 +52,19 @@ class ModuleSyncWatchdogSpec extends Specification {
     def 'Module sync advised cm handles with #scenario.'() {
         given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
-        and: 'the executor has #parallelismLevel available threads'
-            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel
+        and: 'the executor has enough available threads'
+            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs #expectedNumberOfTaskExecutions tasks'
             expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
         where: ' the following parameter are used'
-            scenario              | parallelismLevel | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
-            'less then 1 batch'   | 9                | 1                                                                 || 1
-            'exactly 1 batch'     | 9                | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
-            '2 batches'           | 9                | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
-            'queue capacity'      | 9                | testQueueCapacity                                                 || 3
-            'over queue capacity' | 9                | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
-            'not enough threads'  | 2                | testQueueCapacity                                                 || 2
+            scenario              | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
+            'less then 1 batch'   | 1                                                                 || 1
+            'exactly 1 batch'     | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
+            '2 batches'           | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
+            'queue capacity'      | testQueueCapacity                                                 || 3
+            'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
     }
 
     def 'Reset failed cm handles.'() {
index f07f7f8..47a3e8f 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.cps.spi.repository;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,12 +28,14 @@ import javax.persistence.PersistenceContext;
 import javax.persistence.Query;
 import javax.transaction.Transactional;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.cpspath.parser.CpsPathPrefixType;
 import org.onap.cps.cpspath.parser.CpsPathQuery;
 import org.onap.cps.spi.entities.FragmentEntity;
 import org.onap.cps.utils.JsonObjectMapper;
 
 @RequiredArgsConstructor
+@Slf4j
 public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCpsPathQuery {
 
     public static final String SIMILAR_TO_ABSOLUTE_PATH_PREFIX = "%/";
@@ -62,16 +63,8 @@ public class FragmentRepositoryCpsPathQueryImpl implements FragmentRepositoryCps
         addTextFunctionCondition(cpsPathQuery, sqlStringBuilder, queryParameters);
         final Query query = entityManager.createNativeQuery(sqlStringBuilder.toString(), FragmentEntity.class);
         setQueryParameters(query, queryParameters);
-        return getFragmentEntitiesAsStream(query);
-    }
-
-    private List<FragmentEntity> getFragmentEntitiesAsStream(final Query query) {
-        final List<FragmentEntity> fragmentEntities = new ArrayList<>();
-        query.getResultStream().forEach(fragmentEntity -> {
-            fragmentEntities.add((FragmentEntity) fragmentEntity);
-            entityManager.detach(fragmentEntity);
-        });
-
+        final List<FragmentEntity> fragmentEntities = query.getResultList();
+        log.debug("Fetched {} fragment entities by anchor and cps path.", fragmentEntities.size());
         return fragmentEntities;
     }