Test to highlight ModuleSetTag Inefficiencies 44/139344/10
authorToineSiebelink <toine.siebelink@est.tech>
Tue, 5 Nov 2024 12:04:03 +0000 (12:04 +0000)
committerToineSiebelink <toine.siebelink@est.tech>
Mon, 18 Nov 2024 09:27:07 +0000 (09:27 +0000)
- Add (micrometer) instrumentation to expose inefficiencies
- Add test config for micrometer
- Add setup methods in base to create many cm handles
- Set module sync parallelism to 2 for testing
- Add clean up methods for hazelcast related tests
- added test to show inefficiencies
- POC 1 use hazelcast set to prevent multiple threads working on same ModuleSetTag
- POC 2 'cache' module set tags per thread to prevent DB looks ups
- Main inefficiency left: create schemaset for EACH cm Handled even if same tag. No easy PoC...

Change-Id: Idf46b44c475a24727dd7084bb613459f4c29be55
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
17 files changed:
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/DmiModelOperations.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncServiceSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy
cps-ri/src/main/java/org/onap/cps/ri/CpsModulePersistenceServiceImpl.java
cps-service/src/main/java/org/onap/cps/api/impl/CpsModuleServiceImpl.java
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/PolicyExecutorIntegrationSpec.groovy
integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java [new file with mode: 0644]
integration-test/src/test/resources/application.yml

index 109a541..345eefe 100644 (file)
@@ -51,6 +51,7 @@ public class HazelcastCacheConfig {
 
     protected HazelcastInstance getOrCreateHazelcastInstance(final NamedConfig namedConfig) {
         return Hazelcast.getOrCreateHazelcastInstance(defineInstanceConfig(instanceConfigName, namedConfig));
+
     }
 
     private Config defineInstanceConfig(final String instanceConfigName, final NamedConfig namedConfig) {
index 8ba70b3..a056efd 100644 (file)
@@ -26,6 +26,7 @@ import static org.onap.cps.ncmp.impl.models.RequiredDmiService.MODEL;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
+import io.micrometer.core.annotation.Timed;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.inventory.models.YangResource;
 import org.onap.cps.ncmp.impl.dmi.DmiProperties;
 import org.onap.cps.ncmp.impl.dmi.DmiRestClient;
@@ -48,6 +50,7 @@ import org.springframework.stereotype.Service;
 /**
  * Operations class for DMI Model.
  */
+@Slf4j
 @RequiredArgsConstructor
 @Service
 public class DmiModelOperations {
@@ -62,6 +65,8 @@ public class DmiModelOperations {
      * @param yangModelCmHandle the yang model cm handle
      * @return module references
      */
+    @Timed(value = "cps.ncmp.inventory.module.references.from.dmi",
+        description = "Time taken to get all module references for a cm handle from dmi")
     public List<ModuleReference> getModuleReferences(final YangModelCmHandle yangModelCmHandle) {
         final DmiRequestBody dmiRequestBody = DmiRequestBody.builder()
                 .moduleSetTag(yangModelCmHandle.getModuleSetTag()).build();
@@ -79,6 +84,8 @@ public class DmiModelOperations {
      * @param newModuleReferences the unknown module references
      * @return yang resources as map of module name to yang(re)source
      */
+    @Timed(value = "cps.ncmp.inventory.yang.resources.from.dmi",
+        description = "Time taken to get list of yang resources from dmi")
     public Map<String, String> getNewYangResourcesFromDmi(final YangModelCmHandle yangModelCmHandle,
                                                           final Collection<ModuleReference> newModuleReferences) {
         if (newModuleReferences.isEmpty()) {
index ca0f1c6..ba50dd3 100644 (file)
@@ -26,16 +26,20 @@ import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY
 import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT;
 import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME;
 
+import com.hazelcast.collection.ISet;
 import java.time.OffsetDateTime;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.logging.log4j.util.Strings;
 import org.onap.cps.api.CpsAnchorService;
 import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.ncmp.api.exceptions.NcmpException;
 import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
 import org.onap.cps.spi.CascadeDeleteAllowed;
@@ -50,12 +54,15 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class ModuleSyncService {
 
+    private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap();
+
     private final DmiModelOperations dmiModelOperations;
     private final CpsModuleService cpsModuleService;
     private final CpsDataService cpsDataService;
     private final CpsAnchorService cpsAnchorService;
     private final JsonObjectMapper jsonObjectMapper;
-    private static final Map<String, String> NO_NEW_MODULES = Collections.emptyMap();
+    private final ISet<String> moduleSetTagsBeingProcessed;
+    private final Map<String, ModuleDelta> privateModuleSetCache = new HashMap<>();
 
     @AllArgsConstructor
     private static final class ModuleDelta {
@@ -69,11 +76,37 @@ public class ModuleSyncService {
      * @param yangModelCmHandle the yang model of cm handle.
      */
     public void syncAndCreateSchemaSetAndAnchor(final YangModelCmHandle yangModelCmHandle) {
-        final ModuleDelta moduleDelta = getModuleDelta(yangModelCmHandle, yangModelCmHandle.getModuleSetTag());
-        final String cmHandleId = yangModelCmHandle.getId();
-        cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+        final String moduleSetTag = yangModelCmHandle.getModuleSetTag();
+        final ModuleDelta moduleDelta;
+        boolean isNewModuleSetTag = Strings.isNotBlank(moduleSetTag);
+        try {
+            if (privateModuleSetCache.containsKey(moduleSetTag)) {
+                moduleDelta = privateModuleSetCache.get(moduleSetTag);
+            } else {
+                if (isNewModuleSetTag) {
+                    if (moduleSetTagsBeingProcessed.add(moduleSetTag)) {
+                        log.info("Processing new module set tag {}", moduleSetTag);
+                    } else {
+                        isNewModuleSetTag = false;
+                        throw new NcmpException("Concurrent processing of module set tag " + moduleSetTag,
+                            moduleSetTag + " already being processed for cm handle " + yangModelCmHandle.getId());
+                    }
+                }
+                moduleDelta = getModuleDelta(yangModelCmHandle, moduleSetTag);
+            }
+            final String cmHandleId = yangModelCmHandle.getId();
+            cpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
                 moduleDelta.newModuleNameToContentMap, moduleDelta.allModuleReferences);
-        cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId);
+            cpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cmHandleId);
+            if (isNewModuleSetTag) {
+                final ModuleDelta noModuleDelta = new ModuleDelta(moduleDelta.allModuleReferences, NO_NEW_MODULES);
+                privateModuleSetCache.put(moduleSetTag, noModuleDelta);
+            }
+        } finally {
+            if (isNewModuleSetTag) {
+                moduleSetTagsBeingProcessed.remove(moduleSetTag);
+            }
+        }
     }
 
     /**
@@ -105,6 +138,10 @@ public class ModuleSyncService {
         }
     }
 
+    public void clearPrivateModuleSetCache() {
+        privateModuleSetCache.clear();
+    }
+
     private ModuleDelta getModuleDelta(final YangModelCmHandle yangModelCmHandle, final String targetModuleSetTag) {
         final Map<String, String> newYangResources;
         Collection<ModuleReference> allModuleReferences = getModuleReferencesByModuleSetTag(targetModuleSetTag);
@@ -120,7 +157,7 @@ public class ModuleSyncService {
     }
 
     private Collection<ModuleReference> getModuleReferencesByModuleSetTag(final String moduleSetTag) {
-        if (moduleSetTag == null || moduleSetTag.trim().isEmpty()) {
+        if (Strings.isBlank(moduleSetTag)) {
             return Collections.emptyList();
         }
         return cpsModuleService.getModuleReferencesByAttribute(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
index 31fcbad..7cc74a3 100644 (file)
@@ -63,7 +63,8 @@ public class ModuleSyncTasks {
         try {
             cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> {
                 final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode);
-                cmHandleStatePerCmHandle.put(yangModelCmHandle, processCmHandle(yangModelCmHandle));
+                final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle);
+                cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState);
             });
         } finally {
             batchCounter.getAndDecrement();
@@ -127,4 +128,4 @@ public class ModuleSyncTasks {
             log.info("{} removed from in progress map", resetCmHandleId);
         }
     }
-}
\ No newline at end of file
+}
index 1f33cc3..b98075c 100644 (file)
 
 package org.onap.cps.ncmp.impl.inventory.sync;
 
+import com.hazelcast.collection.ISet;
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.config.QueueConfig;
+import com.hazelcast.config.SetConfig;
 import com.hazelcast.map.IMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -44,6 +46,8 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
     private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig");
     private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
     private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
+    private static final SetConfig moduleSetTagsBeingProcessedConfig
+        = createSetConfig("moduleSetTagsBeingProcessedConfig");
     private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
 
     /**
@@ -63,8 +67,7 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
      */
     @Bean
     public IMap<String, Object> moduleSyncStartedOnCmHandles() {
-        return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap(
-                "moduleSyncStartedOnCmHandles");
+        return getOrCreateHazelcastInstance(moduleSyncStartedConfig).getMap("moduleSyncStartedOnCmHandles");
     }
 
     /**
@@ -77,6 +80,17 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
         return getOrCreateHazelcastInstance(dataSyncSemaphoresConfig).getMap("dataSyncSemaphores");
     }
 
+    /**
+     * Collection of (new) module set tags being processed.
+     * To prevent processing on multiple threads of same tag
+     *
+     * @return set of module set tags being processed
+     */
+    @Bean
+    public ISet<String> moduleSetTagsBeingProcessed() {
+        return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed");
+    }
+
     /**
      * Retrieves a distributed lock used to control access to the work queue for module synchronization.
      * This lock ensures that the population and modification of the work queue are thread-safe and
index 0bd8384..c08ff75 100644 (file)
@@ -22,12 +22,17 @@ package org.onap.cps.ncmp.impl.cache
 
 import com.hazelcast.config.Config
 import com.hazelcast.config.RestEndpointGroup
+import com.hazelcast.core.Hazelcast
 import spock.lang.Specification
 
 class HazelcastCacheConfigSpec extends Specification {
 
     def objectUnderTest = new HazelcastCacheConfig()
 
+    def cleanupSpec() {
+        Hazelcast.getHazelcastInstanceByName('my instance config').shutdown()
+    }
+
     def 'Create Hazelcast instance with a #scenario'() {
         given: 'a cluster name and instance config name'
             objectUnderTest.clusterName = 'my cluster'
index 6030e5d..2f13a9a 100644 (file)
 
 package org.onap.cps.ncmp.impl.inventory.sync
 
+import com.hazelcast.collection.ISet
 import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDataService
 import org.onap.cps.api.CpsModuleService
+import org.onap.cps.ncmp.api.exceptions.NcmpException
 import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.impl.inventory.CmHandleQueryService
 import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService.ModuleDelta
 import org.onap.cps.spi.CascadeDeleteAllowed
 import org.onap.cps.spi.exceptions.SchemaSetNotFoundException
 import org.onap.cps.spi.model.ModuleReference
@@ -45,18 +48,22 @@ class ModuleSyncServiceSpec extends Specification {
     def mockCmHandleQueries = Mock(CmHandleQueryService)
     def mockCpsDataService = Mock(CpsDataService)
     def mockJsonObjectMapper = Mock(JsonObjectMapper)
+    def mockModuleSetTagsBeingProcessed = Mock(ISet<String>);
 
-    def objectUnderTest = new ModuleSyncService(mockDmiModelOperations, mockCpsModuleService,
-            mockCpsDataService, mockCpsAnchorService, mockJsonObjectMapper)
+    def objectUnderTest = new ModuleSyncService(mockDmiModelOperations, mockCpsModuleService, mockCpsDataService, mockCpsAnchorService, mockJsonObjectMapper, mockModuleSetTagsBeingProcessed)
 
     def expectedDataspaceName = NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
 
-    def 'Sync model for a NEW cm handle using module set tags: #scenario.'() {
-        given: 'a cm handle state to be synced'
-            def ncmpServiceCmHandle = new NcmpServiceCmHandle()
-            ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).build())
-            ncmpServiceCmHandle.cmHandleId = 'ch-1'
-            def yangModelCmHandle = YangModelCmHandle.toYangModelCmHandle('some service name', '', '', ncmpServiceCmHandle, moduleSetTag, '', '')
+    def setup() {
+        // Allow tags for al test except 'duplicate-processing-tag' to be added to processing semaphore
+        mockModuleSetTagsBeingProcessed.add('new-tag') >> true
+        mockModuleSetTagsBeingProcessed.add('same-tag') >> true
+        mockModuleSetTagsBeingProcessed.add('cached-tag') >> true
+    }
+
+    def 'Sync models for a NEW cm handle using module set tags: #scenario.'() {
+        given: 'a cm handle to be synced'
+            def yangModelCmHandle = createAdvisedCmHandle(moduleSetTag)
         and: 'DMI operations returns some module references'
             def moduleReferences =  [ new ModuleReference('module1','1'), new ModuleReference('module2','2') ]
             mockDmiModelOperations.getModuleReferences(yangModelCmHandle) >> moduleReferences
@@ -75,10 +82,60 @@ class ModuleSyncServiceSpec extends Specification {
         where: 'the following parameters are used'
             scenario                  | identifiedNewModuleReferences         | newModuleNameContentToMap     | moduleSetTag | existingModuleReferences
             'one new module, new tag' | [new ModuleReference('module1', '1')] | [module1: 'some yang source'] | ''           | []
-            'no new module, new tag'  | []                                    | [:]                           | 'new-tag-1'  | []
+            'no new module, new tag'  | []                                    | [:]                           | 'new-tag'    | []
             'same tag'                | []                                    | [:]                           | 'same-tag'   | [new ModuleReference('module1', '1'), new ModuleReference('module2', '2')]
     }
 
+    def 'Attempt Sync models for a cm handle with exception and #scenario module set tag'() {
+        given: 'a cm handle to be synced'
+            def yangModelCmHandle = createAdvisedCmHandle(moduleSetTag)
+        and: 'the service returns a list of module references when queried with the specified attributes'
+            mockCpsModuleService.getModuleReferencesByAttribute(*_) >> [new ModuleReference('module1', '1')]
+        and: 'exception occurs when try to store result'
+            def testException = new RuntimeException('test')
+            mockCpsModuleService.createSchemaSetFromModules(*_) >> { throw testException }
+        when: 'module sync is triggered'
+            objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+        then: 'the same exception is thrown up'
+            def exceptionThrown = thrown(Exception)
+            assert testException == exceptionThrown
+        and: 'module set tag is removed from processing semaphores only when needed'
+            expectedCallsToRemoveTag * mockModuleSetTagsBeingProcessed.remove('new-tag')
+        where: 'following module set tags are used'
+            scenario  | moduleSetTag || expectedCallsToRemoveTag
+            'with'    | 'new-tag'    || 1
+            'without' | ' '          || 0
+    }
+
+    def 'Sync models for a cm handle with previously cached module set tag.'() {
+        given: 'a cm handle to be synced'
+            def yangModelCmHandle = createAdvisedCmHandle('cached-tag')
+        and: 'The module set tag exist in the private cache'
+            def moduleReferences = [ new ModuleReference('module1','1') ]
+            def cachedModuleDelta = new ModuleDelta(moduleReferences, [:])
+            objectUnderTest.privateModuleSetCache.put('cached-tag', cachedModuleDelta)
+        when: 'module sync is triggered'
+            objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+        then: 'create schema set from module is invoked with correct parameters'
+            1 * mockCpsModuleService.createSchemaSetFromModules(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, 'ch-1', [:], moduleReferences)
+        and: 'anchor is created with the correct parameters'
+            1 * mockCpsAnchorService.createAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, 'ch-1', 'ch-1')
+    }
+
+    def 'Attempt to sync using a module set tag already being processed by a different instance or thread.'() {
+        given: 'a cm handle to be synced'
+            def yangModelCmHandle = createAdvisedCmHandle('duplicateTag')
+        and: 'The module set tag already exist in the processing semaphore set'
+            mockModuleSetTagsBeingProcessed.add('duplicate-processing-tag') > false
+        when: 'module sync is triggered'
+            objectUnderTest.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle)
+        then: 'a ncmp exception is thrown with the relevant details'
+            def exceptionThrown = thrown(NcmpException)
+            assert exceptionThrown.message.contains('duplicateTag')
+            assert exceptionThrown.details.contains('duplicateTag')
+            assert exceptionThrown.details.contains('ch-1')
+    }
+
     def 'Upgrade model for an existing cm handle with Module Set Tag where the modules are #scenario'() {
         given: 'a cm handle being upgraded to module set tag: tag-1'
             def ncmpServiceCmHandle = new NcmpServiceCmHandle()
@@ -113,7 +170,7 @@ class ModuleSyncServiceSpec extends Specification {
             'in database' | [new ModuleReference('module1', '1')]
     }
 
-    def 'upgrade model for a existing cm handle'() {
+    def 'upgrade model for an existing cm handle'() {
         given: 'a cm handle that is ready but locked for upgrade'
             def ncmpServiceCmHandle = new NcmpServiceCmHandle()
             ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder()
@@ -159,4 +216,20 @@ class ModuleSyncServiceSpec extends Specification {
             result == unsupportedOperationException
     }
 
+    def 'Clear module set cache.'() {
+        given: 'something in the module set cache'
+            objectUnderTest.privateModuleSetCache.put('test',new ModuleDelta([],[:]))
+        when: 'the cache is cleared'
+            objectUnderTest.clearPrivateModuleSetCache()
+        then: 'the cache is empty'
+            objectUnderTest.privateModuleSetCache.isEmpty()
+    }
+
+    def createAdvisedCmHandle(moduleSetTag) {
+        def ncmpServiceCmHandle = new NcmpServiceCmHandle()
+        ncmpServiceCmHandle.setCompositeState(new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).build())
+        ncmpServiceCmHandle.cmHandleId = 'ch-1'
+        return YangModelCmHandle.toYangModelCmHandle('some service name', '', '', ncmpServiceCmHandle, moduleSetTag, '', '')
+    }
+
 }
index 8ce1e93..e21c868 100644 (file)
@@ -26,6 +26,7 @@ import ch.qos.logback.classic.Logger
 import ch.qos.logback.classic.spi.ILoggingEvent
 import ch.qos.logback.core.read.ListAppender
 import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
 import com.hazelcast.instance.impl.HazelcastInstanceFactory
 import com.hazelcast.map.IMap
 import org.onap.cps.ncmp.api.inventory.models.CompositeState
@@ -75,6 +76,10 @@ class ModuleSyncTasksSpec extends Specification {
     def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService,
             mockLcmEventsCmHandleStateHandler, moduleSyncStartedOnCmHandles)
 
+    def cleanupSpec() {
+        Hazelcast.getHazelcastInstanceByName('hazelcastInstanceName').shutdown()
+    }
+
     def 'Module Sync ADVISED cm handles.'() {
         given: 'cm handles in an ADVISED state'
             def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED)
index 4c96d6b..c2ecf92 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.cps.ncmp.impl.inventory.sync
 
+import com.hazelcast.collection.ISet
 import com.hazelcast.config.Config
 import com.hazelcast.core.Hazelcast
 import com.hazelcast.map.IMap
@@ -38,13 +39,16 @@ import java.util.concurrent.TimeUnit
 class SynchronizationCacheConfigSpec extends Specification {
 
     @Autowired
-    private BlockingQueue<DataNode> moduleSyncWorkQueue
+    BlockingQueue<DataNode> moduleSyncWorkQueue
 
     @Autowired
-    private IMap<String, Object> moduleSyncStartedOnCmHandles
+    IMap<String, Object> moduleSyncStartedOnCmHandles
 
     @Autowired
-    private IMap<String, Boolean> dataSyncSemaphores
+    IMap<String, Boolean> dataSyncSemaphores
+
+    @Autowired
+    ISet<String> moduleSetTagsBeingProcessed
 
     def cleanupSpec() {
         Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown()
@@ -57,8 +61,11 @@ class SynchronizationCacheConfigSpec extends Specification {
             assert null != moduleSyncStartedOnCmHandles
         and: 'system is able to create an instance of a map to hold data sync semaphores'
             assert null != dataSyncSemaphores
-        and: 'they have the correct names (in any order)'
-            assert Hazelcast.allHazelcastInstances.name.contains('cps-and-ncmp-hazelcast-instance-test-config')
+        and: 'system is able to create an instance of a set to hold module set tags being processed'
+            assert null != moduleSetTagsBeingProcessed
+        and: 'there is only one instance with the correct name'
+            assert Hazelcast.allHazelcastInstances.size() == 1
+            assert Hazelcast.allHazelcastInstances.name[0] == 'cps-and-ncmp-hazelcast-instance-test-config'
     }
 
     def 'Verify configs for Distributed objects'(){
@@ -103,7 +110,6 @@ class SynchronizationCacheConfigSpec extends Specification {
         then: 'applied properties are reflected'
             assert testConfig.networkConfig.join.kubernetesConfig.enabled
             assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
-
     }
 
     def 'Time to Live Verify for Module Sync Semaphore'() {
index 6f491ba..3368aee 100755 (executable)
@@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
+import io.micrometer.core.annotation.Timed;
 import jakarta.transaction.Transactional;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -186,6 +187,8 @@ public class CpsModulePersistenceServiceImpl implements CpsModulePersistenceServ
     // can occur in case of specific concurrent requests.
     @Retryable(retryFor = DuplicatedYangResourceException.class, maxAttempts = 5, backoff =
         @Backoff(random = true, delay = 200, maxDelay = 2000, multiplier = 2))
+    @Timed(value = "cps.module.persistence.schemaset.store",
+        description = "Time taken to store a schemaset (list of module references")
     public void storeSchemaSetFromModules(final String dataspaceName, final String schemaSetName,
                                           final Map<String, String> newModuleNameToContentMap,
                                           final Collection<ModuleReference> allModuleReferences) {
index a600b22..4063a7f 100644 (file)
@@ -171,8 +171,8 @@ public class CpsModuleServiceImpl implements CpsModuleService {
         return cpsModulePersistenceService.identifyNewModuleReferences(moduleReferencesToCheck);
     }
 
-    @Timed(value = "cps.module.service.module.reference.query",
-            description = "Time taken to query list of module references")
+    @Timed(value = "cps.module.service.module.reference.query.by.attribute",
+            description = "Time taken to query list of module references by attribute (e.g moduleSetTag)")
     @Override
     public Collection<ModuleReference> getModuleReferencesByAttribute(final String dataspaceName,
                                                                       final String anchorName,
index 759eccd..02a10cf 100644 (file)
@@ -21,6 +21,7 @@
 
 package org.onap.cps.integration.base
 
+import com.hazelcast.collection.ISet
 import okhttp3.mockwebserver.MockWebServer
 import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDataService
@@ -37,13 +38,13 @@ import org.onap.cps.ncmp.impl.data.NetworkCmProxyQueryService
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
 import org.onap.cps.ncmp.impl.inventory.ParameterizedCmHandleQueryService
 import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
+import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncService
 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
 import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
 import org.onap.cps.ri.repository.DataspaceRepository
 import org.onap.cps.ri.utils.SessionManager
 import org.onap.cps.spi.exceptions.DataspaceNotFoundException
 import org.onap.cps.spi.model.DataNode
-import org.onap.cps.utils.ContentType
 import org.onap.cps.utils.JsonObjectMapper
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.beans.factory.annotation.Value
@@ -61,13 +62,8 @@ import spock.lang.Specification
 import spock.util.concurrent.PollingConditions
 
 import java.time.OffsetDateTime
-import java.time.format.DateTimeFormatter
 import java.util.concurrent.BlockingQueue
 
-import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME
-import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
-import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT
-
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = [CpsDataspaceService])
 @Testcontainers
 @EnableAutoConfiguration
@@ -120,6 +116,9 @@ abstract class CpsIntegrationSpecBase extends Specification {
     @Autowired
     ModuleSyncWatchdog moduleSyncWatchdog
 
+    @Autowired
+    ModuleSyncService moduleSyncService
+
     @Autowired
     BlockingQueue<DataNode> moduleSyncWorkQueue
 
@@ -132,6 +131,8 @@ abstract class CpsIntegrationSpecBase extends Specification {
     @Autowired
     AlternateIdMatcher alternateIdMatcher
 
+    @Autowired
+    ISet<String> moduleSetTagsBeingProcessed
 
     @Value('${ncmp.policy-executor.server.port:8080}')
     private String policyServerPort;
@@ -174,13 +175,13 @@ abstract class CpsIntegrationSpecBase extends Specification {
 
         DMI1_URL = String.format("http://%s:%s", mockDmiServer1.getHostName(), mockDmiServer1.getPort())
         DMI2_URL = String.format("http://%s:%s", mockDmiServer2.getHostName(), mockDmiServer2.getPort())
-
     }
 
     def cleanup() {
         mockDmiServer1.shutdown()
         mockDmiServer2.shutdown()
         mockPolicyServer.shutdown()
+        moduleSetTagsBeingProcessed.clear()
     }
 
     def static readResourceDataFile(filename) {
@@ -262,11 +263,16 @@ abstract class CpsIntegrationSpecBase extends Specification {
         networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: [cmHandleToCreate]))
     }
 
-    def registerSequenceOfCmHandlesWithoutWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles) {
+    def registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles, offset) {
         def cmHandles = []
+        def id = offset
+        def moduleReferences = (1..200).collect { moduleSetTag + '_Module_' + it.toString() }
         (1..numberOfCmHandles).each {
-            def cmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+it, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID)
-            cmHandles.add(cmHandle)
+            def ncmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+id, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID)
+            cmHandles.add(ncmpServiceCmHandle)
+            dmiDispatcher1.moduleNamesPerCmHandleId[ncmpServiceCmHandle.cmHandleId] = moduleReferences
+            dmiDispatcher2.moduleNamesPerCmHandleId[ncmpServiceCmHandle.cmHandleId] = moduleReferences
+            id++
         }
         networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: cmHandles))
     }
@@ -279,9 +285,10 @@ abstract class CpsIntegrationSpecBase extends Specification {
         networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
     }
 
-    def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles) {
+    def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles, offset) {
         def cmHandleIds = []
-        (1..numberOfCmHandles).each { cmHandleIds.add('ch-'+it) }
+        def id = offset
+        (1..numberOfCmHandles).each { cmHandleIds.add('ch-' + id++) }
         networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
     }
 
index 6444937..a5e3daf 100644 (file)
@@ -33,54 +33,55 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyInventoryFacade objectUnderTest
 
-    static final CM_HANDLE_ID = 'ch-1'
-    static final CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG = 'ch-2'
+    def cmHandleId = 'ch-1'
+    def cmHandleIdWithExistingModuleSetTag = 'ch-2'
 
     def setup() {
         objectUnderTest = networkCmProxyInventoryFacade
+        moduleSyncService.clearPrivateModuleSetCache()
     }
 
     def 'Upgrade CM-handle with new moduleSetTag or no moduleSetTag.'() {
         given: 'a CM-handle is created with expected initial modules: M1 and M2'
-            dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
-            registerCmHandle(DMI1_URL, CM_HANDLE_ID, initialModuleSetTag)
-            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+            registerCmHandle(DMI1_URL, cmHandleId, initialModuleSetTag)
+            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
 
         when: "the CM-handle is upgraded with given moduleSetTag '${updatedModuleSetTag}'"
-            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag)
+            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: updatedModuleSetTag)
             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(
                     new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
 
         then: 'registration gives successful response'
-            assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)]
+            assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
 
         and: 'CM-handle is in LOCKED state due to MODULE_UPGRADE'
-            def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID)
+            def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(cmHandleId)
             assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
             assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_UPGRADE
             assert cmHandleCompositeState.lockReason.details == "Upgrade to ModuleSetTag: ${updatedModuleSetTag}"
 
         when: 'DMI will return different modules for upgrade: M1 and M3'
-            dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M3']
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M3']
 
         and: 'the module sync watchdog is triggered twice'
             2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
 
         then: 'CM-handle goes to READY state'
             new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
-                assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
+                assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState
             })
 
         and: 'the CM-handle has expected moduleSetTag'
-            assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == updatedModuleSetTag
+            assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == updatedModuleSetTag
 
         and: 'CM-handle has expected updated modules: M1 and M3'
-            assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+            assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
 
         cleanup: 'deregister CM-handle'
-            deregisterCmHandle(DMI1_URL, CM_HANDLE_ID)
+            deregisterCmHandle(DMI1_URL, cmHandleId)
 
-        where:
+        where: 'following module set tags are used'
             initialModuleSetTag | updatedModuleSetTag
             NO_MODULE_SET_TAG   | NO_MODULE_SET_TAG
             NO_MODULE_SET_TAG   | 'new'
@@ -90,39 +91,39 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
 
     def 'Upgrade CM-handle with existing moduleSetTag.'() {
         given: 'DMI will return modules for registration'
-            dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
-            dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG] = ['M1', 'M3']
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleIdWithExistingModuleSetTag] = ['M1', 'M3']
         and: "an existing CM-handle handle with moduleSetTag '${updatedModuleSetTag}'"
-            registerCmHandle(DMI1_URL, CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG, updatedModuleSetTag)
-            assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG).moduleName.sort()
+            registerCmHandle(DMI1_URL, cmHandleIdWithExistingModuleSetTag, updatedModuleSetTag)
+            assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleIdWithExistingModuleSetTag).moduleName.sort()
         and: "a CM-handle with moduleSetTag '${initialModuleSetTag}' which will be upgraded"
-            registerCmHandle(DMI1_URL, CM_HANDLE_ID, initialModuleSetTag)
-            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+            registerCmHandle(DMI1_URL, cmHandleId, initialModuleSetTag)
+            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
 
         when: "CM-handle is upgraded to moduleSetTag '${updatedModuleSetTag}'"
-            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag)
+            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: updatedModuleSetTag)
             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(
                     new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
 
         then: 'registration gives successful response'
-            assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)]
+            assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
 
         and: 'the module sync watchdog is triggered twice'
             2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
 
         and: 'CM-handle goes to READY state'
             new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
-                assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
+                assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState
             })
 
         and: 'the CM-handle has expected moduleSetTag'
-            assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == updatedModuleSetTag
+            assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == updatedModuleSetTag
 
         and: 'CM-handle has expected updated modules: M1 and M3'
-            assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+            assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
 
         cleanup: 'deregister CM-handle'
-            deregisterCmHandles(DMI1_URL, [CM_HANDLE_ID, CM_HANDLE_ID_WITH_EXISTING_MODULE_SET_TAG])
+            deregisterCmHandles(DMI1_URL, [cmHandleId, cmHandleIdWithExistingModuleSetTag])
 
         where:
             initialModuleSetTag | updatedModuleSetTag
@@ -132,37 +133,37 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
 
     def 'Skip upgrade of CM-handle with same moduleSetTag as before.'() {
         given: 'an existing CM-handle with expected initial modules: M1 and M2'
-            dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
-            registerCmHandle(DMI1_URL, CM_HANDLE_ID, 'same')
-            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+            registerCmHandle(DMI1_URL, cmHandleId, 'same')
+            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
 
         when: 'CM-handle is upgraded with the same moduleSetTag'
-            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'same')
+            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: 'same')
             objectUnderTest.updateDmiRegistration(
                     new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
 
         then: 'CM-handle remains in READY state'
-            assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
+            assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState
 
         and: 'the CM-handle has same moduleSetTag as before'
-            assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == 'same'
+            assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == 'same'
 
         then: 'CM-handle has same modules as before: M1 and M2'
-            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(CM_HANDLE_ID).moduleName.sort()
+            assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort()
 
         cleanup: 'deregister CM-handle'
-            deregisterCmHandle(DMI1_URL, CM_HANDLE_ID)
+            deregisterCmHandle(DMI1_URL, cmHandleId)
     }
 
     def 'Upgrade of CM-handle fails due to DMI error.'() {
         given: 'a CM-handle exists'
-            dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M2']
-            registerCmHandle(DMI1_URL, CM_HANDLE_ID, 'oldTag')
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+            registerCmHandle(DMI1_URL, cmHandleId, 'oldTag')
         and: 'DMI is not available for upgrade'
             dmiDispatcher1.isAvailable = false
 
         when: 'the CM-handle is upgraded'
-            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'newTag')
+            def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [cmHandleId], moduleSetTag: 'newTag')
             objectUnderTest.updateDmiRegistration(
                     new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
 
@@ -171,16 +172,16 @@ class CmHandleUpgradeSpec extends CpsIntegrationSpecBase {
 
         then: 'CM-handle goes to LOCKED state with reason MODULE_UPGRADE_FAILED'
             new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
-                def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID)
+                def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(cmHandleId)
                 assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
                 assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_UPGRADE_FAILED
             })
 
         and: 'the CM-handle has same moduleSetTag as before'
-            assert objectUnderTest.getNcmpServiceCmHandle(CM_HANDLE_ID).moduleSetTag == 'oldTag'
+            assert objectUnderTest.getNcmpServiceCmHandle(cmHandleId).moduleSetTag == 'oldTag'
 
         cleanup: 'deregister CM-handle'
-            deregisterCmHandle(DMI1_URL, CM_HANDLE_ID)
+            deregisterCmHandle(DMI1_URL, cmHandleId)
     }
 
 }
index e0bb437..963bc1f 100644 (file)
 
 package org.onap.cps.integration.functional.ncmp
 
+import io.micrometer.core.instrument.MeterRegistry
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
+import org.springframework.beans.factory.annotation.Autowired
+import spock.util.concurrent.PollingConditions
 
 import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
 
 class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
 
     ModuleSyncWatchdog objectUnderTest
 
+    @Autowired
+    MeterRegistry meterRegistry
+
     def executorService = Executors.newFixedThreadPool(2)
-    def SYNC_SAMPLE_SIZE = 100
+    def PARALLEL_SYNC_SAMPLE_SIZE = 100
 
     def setup() {
         objectUnderTest = moduleSyncWatchdog
-        registerSequenceOfCmHandlesWithoutWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, SYNC_SAMPLE_SIZE)
     }
 
     def cleanup() {
         try {
-            deregisterSequenceOfCmHandles(DMI1_URL, SYNC_SAMPLE_SIZE)
+            deregisterSequenceOfCmHandles(DMI1_URL, PARALLEL_SYNC_SAMPLE_SIZE, 1)
             moduleSyncWorkQueue.clear()
         } finally {
             executorService.shutdownNow()
@@ -47,15 +53,60 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
     }
 
     def 'Watchdog is disabled for test.'() {
+        given:
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
         when: 'wait a while but less then the initial delay of 10 minutes'
             Thread.sleep(3000)
         then: 'the work queue remains empty'
             assert moduleSyncWorkQueue.isEmpty()
     }
 
+    def 'CPS-2478 Highlight module sync inefficiencies.'() {
+        given: 'register 250 cm handles with module set tag cps-2478-A'
+            def numberOfTags = 2
+            def cmHandlesPerTag = 250
+            def totalCmHandles = numberOfTags * cmHandlesPerTag
+            def offset = 1
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-A', cmHandlesPerTag, offset)
+        and: 'register anther 250 cm handles with module set tag cps-2478-B'
+            offset += cmHandlesPerTag
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'cps-2478-B', cmHandlesPerTag, offset)
+        and: 'clear any previous instrumentation'
+            meterRegistry.clear()
+        when: 'sync all advised cm handles'
+            objectUnderTest.moduleSyncAdvisedCmHandles()
+            Thread.sleep(100)
+        then: 'retry until all schema sets are stored in db (1 schema set  for each cm handle)'
+            def dbSchemaSetStorageTimer = meterRegistry.get('cps.module.persistence.schemaset.store').timer()
+            new PollingConditions().within(10, () -> {
+                objectUnderTest.moduleSyncAdvisedCmHandles()
+                Thread.sleep(100)
+                assert dbSchemaSetStorageTimer.count() >= 500
+            })
+        then: 'wait till at least 5 batches of state updates are done (often more because of retries of locked cm handles)'
+            def dbStateUpdateTimer = meterRegistry.get('cps.ncmp.cmhandle.state.update.batch').timer()
+            new PollingConditions().within(10, () -> {
+                assert dbStateUpdateTimer.count() >= 5
+            })
+        and: 'the db has been queried for tags exactly 2 times.'
+            def dbModuleQueriesTimer = meterRegistry.get('cps.module.service.module.reference.query.by.attribute').timer()
+            assert dbModuleQueriesTimer.count() == 2
+        and: 'exactly 2 calls to DMI to get module references'
+            def dmiModuleRetrievalTimer = meterRegistry.get('cps.ncmp.inventory.module.references.from.dmi').timer()
+            assert dmiModuleRetrievalTimer.count() == 2
+        and: 'log the relevant instrumentation'
+            logInstrumentation(dbModuleQueriesTimer,    'query module references')
+            logInstrumentation(dmiModuleRetrievalTimer, 'get modules from DMI   ')
+            logInstrumentation(dbSchemaSetStorageTimer, 'store schema sets      ')
+            logInstrumentation(dbStateUpdateTimer,      'batch state updates    ')
+        cleanup: 'remove all cm handles'
+            deregisterSequenceOfCmHandles(DMI1_URL, totalCmHandles, 1)
+    }
+
     def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
         // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
         given: 'the queue is empty at the start'
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
             assert moduleSyncWorkQueue.isEmpty()
         when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
             objectUnderTest.populateWorkQueueIfNeeded()
@@ -63,12 +114,13 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
         and: 'wait a little (to give all threads time to complete their task)'
             Thread.sleep(50)
         then: 'the queue size is exactly the sample size'
-            assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE
+            assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
     }
 
     def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
         // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
         given: 'the queue is empty at the start'
+            registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, PARALLEL_SYNC_SAMPLE_SIZE, 1)
             assert moduleSyncWorkQueue.isEmpty()
         when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
             objectUnderTest.populateWorkQueueIfNeeded()
@@ -76,7 +128,12 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
         and: 'wait a little (to give all threads time to complete their task)'
             Thread.sleep(50)
         then: 'the queue size is exactly the sample size'
-            assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE
+            assert moduleSyncWorkQueue.size() == PARALLEL_SYNC_SAMPLE_SIZE
+    }
+
+    def logInstrumentation(timer, description) {
+        System.out.println('*** CPS-2478, ' + description + ' : ' + timer.count()+ ' times, total ' + timer.totalTime(TimeUnit.MILLISECONDS) + ' ms')
+        return true
     }
 
     def populateQueueWithoutDelay = () -> {
index 56d4bfa..f897393 100644 (file)
@@ -43,9 +43,7 @@ class PolicyExecutorIntegrationSpec extends CpsIntegrationSpecBase {
     }
 
     def cleanup() {
-        deregisterCmHandle(DMI1_URL, 'ch-1')
-        deregisterCmHandle(DMI1_URL, 'ch-2')
-        deregisterCmHandle(DMI1_URL, 'ch-3')
+        deregisterSequenceOfCmHandles(DMI1_URL, 3, 1)
     }
 
     def 'Policy Executor create request with #scenario.'() {
diff --git a/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java b/integration-test/src/test/java/org/onap/cps/integration/MicroMeterTestConfig.java
new file mode 100644 (file)
index 0000000..3b26f42
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the 'License');
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.integration;
+
+import io.micrometer.core.aop.TimedAspect;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MicroMeterTestConfig {
+    @Bean
+    public MeterRegistry meterRegistry() {
+        return new SimpleMeterRegistry(); // Use a simple in-memory registry for testing
+    }
+
+    @Bean
+    public TimedAspect timedAspect(final MeterRegistry meterRegistry) {
+        return new TimedAspect(meterRegistry);
+    }
+}
+
index b786a3d..30598df 100644 (file)
@@ -191,7 +191,7 @@ ncmp:
 
   modules-sync-watchdog:
     async-executor:
-      parallelism-level: 1
+      parallelism-level: 2
 
   model-loader:
     maximum-attempt-count: 20