[Module Sync] Store CM-handle IDs in work queue 75/139675/3
authordanielhanrahan <daniel.hanrahan@est.tech>
Fri, 1 Nov 2024 17:36:42 +0000 (17:36 +0000)
committerdanielhanrahan <daniel.hanrahan@est.tech>
Fri, 6 Dec 2024 12:58:42 +0000 (12:58 +0000)
This fixes bug CPS-2474, handling various edge cases, such as
CM handles being deleted during module sync.

- Change moduleSyncWorkQueue to store CmHandleId instead of DataNode.
- Freshly fetch Cm Handles in module sync task, so latest CM-handle
  state is used, and only process ADVISED CM handles in module sync.

Issue-ID: CPS-2474
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: I53d5796c56014a2bfbe5b1c3f17d3991e4feef53

12 files changed:
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtils.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleQueryServiceImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleOperationsUtilsSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy

index bb9ad50..2ba1b2a 100644 (file)
@@ -59,12 +59,12 @@ public interface CmHandleQueryService {
                                                   boolean outputAlternateId);
 
     /**
-     * Method which returns cm handles by the cm handles state.
+     * Method which returns cm handle ids by the cm handles state.
      *
      * @param cmHandleState cm handle state
-     * @return a list of data nodes representing the cm handles.
+     * @return a list of cm handle ids.
      */
-    Collection<DataNode> queryCmHandlesByState(CmHandleState cmHandleState);
+    Collection<String> queryCmHandleIdsByState(CmHandleState cmHandleState);
 
     /**
      * Method to return data nodes with ancestor representing the cm handles.
index 96fa03d..5610013 100644 (file)
@@ -21,7 +21,6 @@
 
 package org.onap.cps.ncmp.impl.inventory;
 
-import static org.onap.cps.api.parameters.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS;
 import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
 import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME;
 import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR;
@@ -45,6 +44,7 @@ import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
 import org.onap.cps.ncmp.impl.inventory.models.ModelledDmiServiceLeaves;
 import org.onap.cps.ncmp.impl.inventory.models.PropertyType;
 import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelCacheConfig;
+import org.onap.cps.ncmp.impl.utils.YangDataConverter;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
 
@@ -87,14 +87,18 @@ public class CmHandleQueryServiceImpl implements CmHandleQueryService {
     }
 
     @Override
-    public Collection<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) {
-        return queryCmHandleAncestorsByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
-            INCLUDE_ALL_DESCENDANTS);
+    public Collection<String> queryCmHandleIdsByState(final CmHandleState cmHandleState) {
+        final Collection<DataNode> cmHandlesAsDataNodes =
+                queryNcmpRegistryByCpsPath("//state[@cm-handle-state='" + cmHandleState + "']", OMIT_DESCENDANTS);
+        return cmHandlesAsDataNodes.stream()
+                .map(DataNode::getXpath)
+                .map(YangDataConverter::extractCmHandleIdFromXpath)
+                .toList();
     }
 
     @Override
     public Collection<DataNode> queryNcmpRegistryByCpsPath(final String cpsPath,
-                                                     final FetchDescendantsOption fetchDescendantsOption) {
+                                                           final FetchDescendantsOption fetchDescendantsOption) {
         return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, cpsPath,
                 fetchDescendantsOption);
     }
@@ -232,5 +236,3 @@ public class CmHandleQueryServiceImpl implements CmHandleQueryService {
                 xpath, OMIT_DESCENDANTS).iterator().next();
     }
 }
-
-
index 8bf13a0..2895d9b 100644 (file)
@@ -70,13 +70,10 @@ public class ModuleOperationsUtils {
     /**
      * Query data nodes for cm handles with an "ADVISED" cm handle state.
      *
-     * @return cm handles (data nodes) in ADVISED state (empty list if none found)
+     * @return cm handle ids in ADVISED state (empty list if none found)
      */
-    public Collection<DataNode> getAdvisedCmHandles() {
-        final Collection<DataNode> advisedCmHandlesAsDataNodes =
-            cmHandleQueryService.queryCmHandlesByState(CmHandleState.ADVISED);
-        log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size());
-        return advisedCmHandlesAsDataNodes;
+    public Collection<String> getAdvisedCmHandleIds() {
+        return cmHandleQueryService.queryCmHandleIdsByState(CmHandleState.ADVISED);
     }
 
     /**
index 81656a4..c97b284 100644 (file)
@@ -28,14 +28,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.model.DataNode;
+import org.onap.cps.api.exceptions.DataNodeNotFoundException;
 import org.onap.cps.ncmp.api.inventory.models.CompositeState;
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
 import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
 import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler;
-import org.onap.cps.ncmp.impl.utils.YangDataConverter;
 import org.springframework.stereotype.Component;
 
 @RequiredArgsConstructor
@@ -51,21 +50,29 @@ public class ModuleSyncTasks {
     /**
      * Perform module sync on a batch of cm handles.
      *
-     * @param cmHandlesAsDataNodes         a batch of Data nodes representing cm handles to perform module sync on
+     * @param cmHandleIds                  a batch of cm handle ids to perform module sync on
      * @param batchCounter                 the number of batches currently being processed, will be decreased when
      *                                     task is finished or fails
      * @return completed future to handle post-processing
      */
-    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
+    public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
                                                      final AtomicInteger batchCounter) {
-        final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle =
-            new HashMap<>(cmHandlesAsDataNodes.size());
+        final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
         try {
-            cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> {
-                final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode);
-                final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle);
-                cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState);
-            });
+            for (final String cmHandleId : cmHandleIds) {
+                try {
+                    final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId);
+                    if (isCmHandleInAdvisedState(yangModelCmHandle)) {
+                        final CmHandleState newCmHandleState = processCmHandle(yangModelCmHandle);
+                        cmHandleStatePerCmHandle.put(yangModelCmHandle, newCmHandleState);
+                    } else {
+                        log.warn("Skipping module sync for CM handle '{}' as it is in {} state", cmHandleId,
+                                yangModelCmHandle.getCompositeState().getCmHandleState().name());
+                    }
+                } catch (final DataNodeNotFoundException dataNodeNotFoundException) {
+                    log.warn("Skipping module sync for CM handle '{}' as it does not exist", cmHandleId);
+                }
+            }
         } finally {
             batchCounter.getAndDecrement();
             lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
@@ -96,7 +103,7 @@ public class ModuleSyncTasks {
     }
 
     private CmHandleState processCmHandle(final YangModelCmHandle yangModelCmHandle) {
-        final CompositeState compositeState = inventoryPersistence.getCmHandleState(yangModelCmHandle.getId());
+        final CompositeState compositeState = yangModelCmHandle.getCompositeState();
         final boolean inUpgrade = ModuleOperationsUtils.inUpgradeOrUpgradeFailed(compositeState);
         try {
             if (inUpgrade) {
@@ -105,27 +112,25 @@ public class ModuleSyncTasks {
                 moduleSyncService.deleteSchemaSetIfExists(yangModelCmHandle.getId());
                 moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
             }
-            yangModelCmHandle.getCompositeState().setLockReason(null);
+            compositeState.setLockReason(null);
             return CmHandleState.READY;
         } catch (final Exception e) {
             log.warn("Processing of {} module failed due to reason {}.", yangModelCmHandle.getId(), e.getMessage());
-            final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED
-                : LockReasonCategory.MODULE_SYNC_FAILED;
-            moduleOperationsUtils.updateLockReasonWithAttempts(compositeState,
-                lockReasonCategory, e.getMessage());
-            setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+            final LockReasonCategory lockReasonCategory = inUpgrade
+                    ? LockReasonCategory.MODULE_UPGRADE_FAILED
+                    : LockReasonCategory.MODULE_SYNC_FAILED;
+            moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, lockReasonCategory, e.getMessage());
             return CmHandleState.LOCKED;
         }
     }
 
-    private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
-                                        final CompositeState.LockReason lockReason) {
-        advisedCmHandle.getCompositeState().setLockReason(lockReason);
-    }
-
     private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) {
         if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) {
             log.info("{} removed from in progress map", resetCmHandleId);
         }
     }
+
+    private static boolean isCmHandleInAdvisedState(final YangModelCmHandle yangModelCmHandle) {
+        return yangModelCmHandle.getCompositeState().getCmHandleState() == CmHandleState.ADVISED;
+    }
 }
index 42c0d8f..74bef43 100644 (file)
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.model.DataNode;
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
 import org.onap.cps.ncmp.impl.utils.Sleeper;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -43,7 +42,7 @@ import org.springframework.stereotype.Service;
 public class ModuleSyncWatchdog {
 
     private final ModuleOperationsUtils moduleOperationsUtils;
-    private final BlockingQueue<DataNode> moduleSyncWorkQueue;
+    private final BlockingQueue<String> moduleSyncWorkQueue;
     private final IMap<String, Object> moduleSyncStartedOnCmHandles;
     private final ModuleSyncTasks moduleSyncTasks;
     private final AsyncTaskExecutor asyncTaskExecutor;
@@ -70,7 +69,7 @@ public class ModuleSyncWatchdog {
         populateWorkQueueIfNeeded();
         while (!moduleSyncWorkQueue.isEmpty()) {
             if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
-                final Collection<DataNode> nextBatch = prepareNextBatch();
+                final Collection<String> nextBatch = prepareNextBatch();
                 log.info("Processing module sync batch of {}. {} batch(es) active.",
                     nextBatch.size(), batchCounter.get());
                 if (!nextBatch.isEmpty()) {
@@ -104,14 +103,14 @@ public class ModuleSyncWatchdog {
     }
 
     private void populateWorkQueue() {
-        final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
-        if (advisedCmHandles.isEmpty()) {
+        final Collection<String> advisedCmHandleIds = moduleOperationsUtils.getAdvisedCmHandleIds();
+        if (advisedCmHandleIds.isEmpty()) {
             log.debug("No advised CM handles found in DB.");
         } else {
-            log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size());
-            advisedCmHandles.forEach(advisedCmHandle -> {
-                final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id"));
-                if (moduleSyncWorkQueue.offer(advisedCmHandle)) {
+            log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.",
+                    advisedCmHandleIds.size());
+            advisedCmHandleIds.forEach(cmHandleId -> {
+                if (moduleSyncWorkQueue.offer(cmHandleId)) {
                     log.info("CM handle {} added to the work queue.", cmHandleId);
                 } else {
                     log.warn("Failed to add CM handle {} to the work queue.", cmHandleId);
@@ -133,13 +132,12 @@ public class ModuleSyncWatchdog {
         }
     }
 
-    private Collection<DataNode> prepareNextBatch() {
-        final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
-        final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+    private Collection<String> prepareNextBatch() {
+        final Collection<String> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+        final Collection<String> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
         moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE);
         log.info("nextBatchCandidates size : {}", nextBatchCandidates.size());
-        for (final DataNode batchCandidate : nextBatchCandidates) {
-            final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
+        for (final String cmHandleId : nextBatchCandidates) {
             final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals(
                     moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP,
                             SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS));
@@ -147,7 +145,7 @@ public class ModuleSyncWatchdog {
                 log.info("module sync for {} already in progress by other instance", cmHandleId);
             } else {
                 log.info("Adding cmHandle : {} to current batch", cmHandleId);
-                nextBatch.add(batchCandidate);
+                nextBatch.add(cmHandleId);
             }
         }
         log.info("nextBatch size : {}", nextBatch.size());
index 9cf29aa..671e791 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START========================================================
- *  Copyright (C) 2022-2023 Nordix Foundation
+ *  Copyright (C) 2022-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@ import com.hazelcast.map.IMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.locks.Lock;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.api.model.DataNode;
 import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -53,10 +52,10 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
     /**
      * Module Sync Distributed Queue Instance.
      *
-     * @return queue of cm handles (data nodes) that need module sync
+     * @return queue of cm handle ids that need module sync
      */
     @Bean
-    public BlockingQueue<DataNode> moduleSyncWorkQueue() {
+    public BlockingQueue<String> moduleSyncWorkQueue() {
         return getOrCreateHazelcastInstance(commonQueueConfig).getQueue("moduleSyncWorkQueue");
     }
 
index be99479..d19081c 100644 (file)
@@ -27,7 +27,6 @@ import org.onap.cps.impl.utils.CpsValidator
 import org.onap.cps.ncmp.api.inventory.models.TrustLevel
 import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
 import org.onap.cps.api.model.DataNode
-import spock.lang.Shared
 import spock.lang.Specification
 
 import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME
@@ -40,17 +39,14 @@ class CmHandleQueryServiceImplSpec extends Specification {
 
     def mockCpsQueryService = Mock(CpsQueryService)
     def mockCpsDataService = Mock(CpsDataService)
-
     def trustLevelPerDmiPlugin = [:]
-
     def trustLevelPerCmHandleId = [ 'PNFDemo': TrustLevel.COMPLETE, 'PNFDemo2': TrustLevel.NONE, 'PNFDemo4': TrustLevel.NONE ]
-
     def mockCpsValidator = Mock(CpsValidator)
 
     def objectUnderTest = new CmHandleQueryServiceImpl(mockCpsDataService, mockCpsQueryService, trustLevelPerDmiPlugin, trustLevelPerCmHandleId, mockCpsValidator)
 
-    @Shared
-    def static sampleDataNodes = [new DataNode()]
+    def static sampleDataNodes = [new DataNode(xpath: "/dmi-registry/cm-handles[@id='ch-1']"),
+                                  new DataNode(xpath: "/dmi-registry/cm-handles[@id='ch-2']")]
 
     def dataNodeWithPrivateField = '//additional-properties[@name=\"Contact3\" and @value=\"newemailforstore3@bookstore.com\"]/ancestor::cm-handles'
 
@@ -117,16 +113,16 @@ class CmHandleQueryServiceImplSpec extends Specification {
             result.size() == 1
     }
 
-    def 'Get CmHandles by it\'s state.'() {
+    def 'Get Ids of CmHandles by state.'() {
         given: 'a cm handle state to query'
             def cmHandleState = CmHandleState.ADVISED
         and: 'the persistence service returns a list of data nodes'
             mockCpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
-                '//state[@cm-handle-state="ADVISED"]/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS) >> sampleDataNodes
+                "//state[@cm-handle-state='ADVISED']", OMIT_DESCENDANTS) >> sampleDataNodes
         when: 'cm handles are fetched by state'
-            def result = objectUnderTest.queryCmHandlesByState(cmHandleState)
+            def result = objectUnderTest.queryCmHandleIdsByState(cmHandleState)
         then: 'the returned result matches the result from the persistence service'
-            assert result == sampleDataNodes
+            assert result.toSet() == ['ch-1', 'ch-2'].toSet()
     }
 
     def 'Check the state of a cmHandle when #scenario.'() {
index de5f7e2..f116e0e 100644 (file)
@@ -79,15 +79,15 @@ class ModuleOperationsUtilsSpec extends Specification{
 
     def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
         given: 'the inventory persistence service returns a collection of data nodes'
-            mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
-        when: 'get advised cm handles are fetched'
-            def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles()
-        then: 'the returned data node collection is the correct size'
-            yangModelCmHandles.size() == expectedDataNodeSize
+            mockCmHandleQueries.queryCmHandleIdsByState(CmHandleState.ADVISED) >> cmHandleIds
+        when: 'advised cm handle ids are fetched'
+            def advisedCmHandleIds = objectUnderTest.getAdvisedCmHandleIds()
+        then: 'the expected cm handle ids are returned'
+            advisedCmHandleIds == cmHandleIds
         where: 'the following scenarios are used'
-            scenario         | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
-            'exists'         | [dataNode]         || 1                                   | 1
-            'does not exist' | []                 || 0                                   | 0
+            scenario         | cmHandleIds
+            'exists'         | ['cm-handle-123']
+            'does not exist' | []
     }
 
     def 'Update Lock Reason, Details and Attempts where lock reason #scenario'() {
index 5e2162e..97c2488 100644 (file)
@@ -36,9 +36,7 @@ import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
 import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler
 import org.onap.cps.api.exceptions.DataNodeNotFoundException
-import org.onap.cps.api.model.DataNode
 import org.slf4j.LoggerFactory
-import spock.lang.Ignore
 import spock.lang.Specification
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -82,12 +80,13 @@ class ModuleSyncTasksSpec extends Specification {
 
     def 'Module Sync ADVISED cm handles.'() {
         given: 'cm handles in an ADVISED state'
-            def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED)
-            def cmHandle2 = cmHandleAsDataNodeByIdAndState('cm-handle-2', CmHandleState.ADVISED)
-        and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
-            mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
+            def cmHandle1 = cmHandleByIdAndState('cm-handle-1', CmHandleState.ADVISED)
+            def cmHandle2 = cmHandleByIdAndState('cm-handle-2', CmHandleState.ADVISED)
+        and: 'the inventory persistence cm handle returns a ADVISED state for the handles'
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> cmHandle1
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle-2') >> cmHandle2
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2'], batchCount)
         then: 'module sync service deletes schemas set of each cm handle if it already exists'
             1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1')
             1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2')
@@ -103,18 +102,17 @@ class ModuleSyncTasksSpec extends Specification {
     }
 
     def 'Handle CM handle failure during #scenario and log MODULE_UPGRADE lock reason'() {
-        given: 'a CM handle in LOCKED state with a specific lock reason'
-            def cmHandle = cmHandleAsDataNodeByIdAndState('cm-handle', CmHandleState.LOCKED)
-            def expectedCmHandleState = new CompositeState(cmHandleState: CmHandleState.LOCKED, lockReason: CompositeState
-                    .LockReason.builder().lockReasonCategory(lockReasonCategory).details(lockReasonDetails).build())
-            1 * mockInventoryPersistence.getCmHandleState('cm-handle') >> expectedCmHandleState
+        given: 'a CM handle in ADVISED state with a specific lock reason'
+            def cmHandle = cmHandleByIdAndState('cm-handle', CmHandleState.ADVISED)
+            cmHandle.compositeState.lockReason = CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).details(lockReasonDetails).build()
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle') >> cmHandle
         and: 'module sync service attempts to sync/upgrade the CM handle and throws an exception'
             mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { throw new Exception('some exception') }
             mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { throw new Exception('some exception') }
         when: 'module sync is executed'
-            objectUnderTest.performModuleSync([cmHandle], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle'], batchCount)
         then: 'lock reason is updated with number of attempts'
-            1 * mockSyncUtils.updateLockReasonWithAttempts(expectedCmHandleState, expectedLockReasonCategory, 'some exception')
+            1 * mockSyncUtils.updateLockReasonWithAttempts(_, expectedLockReasonCategory, 'some exception')
         and: 'the state handler is called to update the state to LOCKED'
             1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
                 assertBatch(args, ['cm-handle'], CmHandleState.LOCKED)
@@ -128,25 +126,27 @@ class ModuleSyncTasksSpec extends Specification {
             'module upgrade' | MODULE_UPGRADE        | 'Upgrade in progress'                          || MODULE_UPGRADE_FAILED
     }
 
-    @Ignore  // TODO Enable this test once the bug CPS-2474 is fixed
     def 'Module sync succeeds even if a handle gets deleted during module sync.'() {
-        given: 'cm handles in an ADVISED state'
-            def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED)
-            def cmHandle2 = cmHandleAsDataNodeByIdAndState('cm-handle-2', CmHandleState.ADVISED)
-        and: 'inventory persistence cannot find the first handle'
-            mockInventoryPersistence.getCmHandleState('cm-handle-1') >> { throw new DataNodeNotFoundException('dataspace', 'anchor', 'xpath') }
-        and: 'inventory persistence returns the second handle with ADVISED state'
-            mockInventoryPersistence.getCmHandleState('cm-handle-2') >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
+        given: 'a cm handle which has been deleted'
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> { throw new DataNodeNotFoundException('dataspace', 'anchor', 'cm-handle-1') }
+        and: 'a cm handle which is being deleted'
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle-2') >> cmHandleByIdAndState('cm-handle-2', CmHandleState.DELETING)
+        and: 'a cm handle in advised state'
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle-3') >> cmHandleByIdAndState('cm-handle-3', CmHandleState.ADVISED)
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle-1', 'cm-handle-2', 'cm-handle-3'], batchCount)
         then: 'no exception is thrown'
             noExceptionThrown()
         and: 'the deleted cm-handle did not sync'
             0 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
-        and: 'the existing cm-handle synced'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' }
-        and: 'the state handler called'
-            1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_)
+        and: 'the deleting cm-handle did not sync'
+            0 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' }
+        and: 'the advised cm-handle synced'
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-3' }
+        and: 'the state handler called for only the advised handle'
+            1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >> { args ->
+                assertBatch(args, ['cm-handle-3'], CmHandleState.READY)
+            }
     }
 
     def 'Reset failed CM Handles #scenario.'() {
@@ -172,15 +172,15 @@ class ModuleSyncTasksSpec extends Specification {
 
     def 'Module Sync ADVISED cm handle without entry in progress map.'() {
         given: 'cm handles in an ADVISED state'
-            def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED)
+            def cmHandle1 = cmHandleByIdAndState('cm-handle-1', CmHandleState.ADVISED)
         and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
-            mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle-1') >> cmHandle1
         and: 'entry in progress map for other cm handle'
             moduleSyncStartedOnCmHandles.put('other-cm-handle', 'started')
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync([cmHandle1], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle-1'], batchCount)
         then: 'module sync service is invoked for cm handle'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_)
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' }
         and: 'the entry for other cm handle is still in the progress map'
             assert moduleSyncStartedOnCmHandles.get('other-cm-handle') != null
     }
@@ -199,11 +199,11 @@ class ModuleSyncTasksSpec extends Specification {
 
     def 'Sync and upgrade CM handle if in upgrade state for #scenario'() {
         given: 'a CM handle in an upgrade state'
-            def cmHandle = cmHandleAsDataNodeByIdAndState('cm-handle', CmHandleState.LOCKED)
-            def compositeState = new CompositeState(lockReason: CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).build())
-            1 * mockInventoryPersistence.getCmHandleState('cm-handle') >> compositeState
+            def cmHandle = cmHandleByIdAndState('cm-handle', CmHandleState.ADVISED)
+            cmHandle.compositeState.setLockReason(CompositeState.LockReason.builder().lockReasonCategory(lockReasonCategory).build())
+            mockInventoryPersistence.getYangModelCmHandle('cm-handle') >> cmHandle
         when: 'module sync is executed'
-            objectUnderTest.performModuleSync([cmHandle], batchCount)
+            objectUnderTest.performModuleSync(['cm-handle'], batchCount)
         then: 'the module sync service should attempt to sync and upgrade the CM handle'
             1 * mockModuleSyncService.syncAndUpgradeSchemaSet(_) >> { args ->
                 assert args[0].id == 'cm-handle'
@@ -226,8 +226,8 @@ class ModuleSyncTasksSpec extends Specification {
             assert loggingEvent == null
     }
 
-    def cmHandleAsDataNodeByIdAndState(cmHandleId, cmHandleState) {
-        return new DataNode(anchorName: cmHandleId, leaves: ['id': cmHandleId, 'cm-handle-state': cmHandleState])
+    def cmHandleByIdAndState(cmHandleId, cmHandleState) {
+        return new YangModelCmHandle(id: cmHandleId, compositeState: new CompositeState(cmHandleState: cmHandleState))
     }
 
     def assertBatch(args, expectedCmHandleStatePerCmHandleIds, expectedCmHandleState) {
index b8b3e45..4cf07e4 100644 (file)
@@ -56,7 +56,7 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def 'Module sync advised cm handles with #scenario.'() {
         given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
-            mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+            mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles)
         and: 'module sync utilities returns no failed (locked) cm handles'
             mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
         and: 'the work queue is not locked'
@@ -79,7 +79,7 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def 'Module sync cm handles starts with no available threads.'() {
         given: 'module sync utilities returns a advise cm handles'
-            mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+            mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'the work queue is not locked'
             mockWorkQueueLock.tryLock() >> true
         and: 'the executor first has no threads but has one thread on the second attempt'
@@ -92,7 +92,7 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def 'Module sync advised cm handle already handled by other thread.'() {
         given: 'module sync utilities returns an advised cm handle'
-            mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+            mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'the work queue is not locked'
             mockWorkQueueLock.tryLock() >> true
         and: 'the executor has a thread available'
@@ -107,9 +107,9 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def 'Module sync with previous cm handle(s) left in work queue.'() {
         given: 'there is still a cm handle in the queue'
-            moduleSyncWorkQueue.offer(new DataNode())
+            moduleSyncWorkQueue.offer('ch-1')
         and: 'sync utilities returns many advise cm handles'
-            mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(500)
+            mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(500)
         and: 'the executor has plenty threads available'
             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
         when: ' module sync is started'
@@ -130,7 +130,7 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def 'Module Sync Locking.'() {
         given: 'module sync utilities returns an advised cm handle'
-            mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+            mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'can lock is : #canLock'
             mockWorkQueueLock.tryLock() >> canLock
         when: 'attempt to populate the work queue'
@@ -152,9 +152,7 @@ class ModuleSyncWatchdogSpec extends Specification {
             noExceptionThrown()
     }
 
-    def createDataNodes(numberOfDataNodes) {
-        def dataNodes = []
-        numberOfDataNodes.times { dataNodes.add(new DataNode()) }
-        return dataNodes
+    def createCmHandleIds(numberOfCmHandles) {
+        return (numberOfCmHandles > 0) ? (1..numberOfCmHandles).collect { 'ch-'+it } : []
     }
 }
index 6914273..3213e5d 100644 (file)
@@ -24,22 +24,20 @@ import com.hazelcast.collection.ISet
 import com.hazelcast.config.Config
 import com.hazelcast.core.Hazelcast
 import com.hazelcast.map.IMap
-import org.onap.cps.api.model.DataNode
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.TimeUnit
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.context.ContextConfiguration
 import spock.lang.Specification
 import spock.util.concurrent.PollingConditions
 
-import java.util.concurrent.BlockingQueue
-import java.util.concurrent.TimeUnit
-
 @SpringBootTest
 @ContextConfiguration(classes = [SynchronizationCacheConfig])
 class SynchronizationCacheConfigSpec extends Specification {
 
     @Autowired
-    BlockingQueue<DataNode> moduleSyncWorkQueue
+    BlockingQueue<String> moduleSyncWorkQueue
 
     @Autowired
     IMap<String, Object> moduleSyncStartedOnCmHandles
index 69cc087..0725fe8 100644 (file)
@@ -120,7 +120,7 @@ abstract class CpsIntegrationSpecBase extends Specification {
     ModuleSyncService moduleSyncService
 
     @Autowired
-    BlockingQueue<DataNode> moduleSyncWorkQueue
+    BlockingQueue<String> moduleSyncWorkQueue
 
     @Autowired
     JsonObjectMapper jsonObjectMapper