Distributed datastore solution for Data Sync Watchdog 99/129999/16
authorkissand <andras.zoltan.kiss@est.tech>
Thu, 21 Jul 2022 12:53:37 +0000 (14:53 +0200)
committerkissand <andras.zoltan.kiss@est.tech>
Mon, 22 Aug 2022 12:36:00 +0000 (14:36 +0200)
- update lombok config to handle Qualifier annotation
- update Semaphore config to use ConcurrentMap
- update SyncUtils to return a list of cm handles
- update DataSyncWatchdog and ModuleSyncWatchdog with Qualifier
- update DataSyncWatchdog to handle a list of cm handles
- Use get with xpath to check cm handle state

Issue-ID: CPS-1015
Change-Id: Icb39bd29f89e0020d49a1f8960476ffe81b12362
Signed-off-by: kissand <andras.zoltan.kiss@est.tech>
12 files changed:
cps-ncmp-service/lombok.config
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncSpec.groovy with 80% similarity]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy

index 0736fc5..b60a192 100644 (file)
@@ -1,5 +1,5 @@
 #  ============LICENSE_START=======================================================
-#  Copyright (C) 2021 Nordix Foundation
+#  Copyright (C) 2021-2022 Nordix Foundation
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -18,3 +18,4 @@
 
 config.stopBubbling = true
 lombok.addLombokGeneratedAnnotation = true
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file
index 6696f8e..f8836e6 100644 (file)
@@ -148,7 +148,7 @@ public class NetworkCmProxyCmHandlerQueryServiceImpl implements NetworkCmProxyCm
             cpsPathQueryResult = NO_QUERY_TO_EXECUTE;
         } else {
             try {
-                cpsPathQueryResult = cmHandleQueries.getCmHandleDataNodesByCpsPath(
+                cpsPathQueryResult = cmHandleQueries.queryCmHandleDataNodesByCpsPath(
                     cpsPath.get("cpsPath"), INCLUDE_ALL_DESCENDANTS)
                     .stream().map(this::createNcmpServiceCmHandle)
                     .collect(Collectors.toMap(NcmpServiceCmHandle::getCmHandleId,
index 1efe176..571558a 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * ============LICENSE_START========================================================
+ *  ===========LICENSE_START========================================================
  *  Copyright (C) 2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,7 +24,6 @@ import com.hazelcast.config.Config;
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.springframework.context.annotation.Bean;
@@ -41,23 +40,23 @@ public class SynchronizationSemaphoresConfig {
     /**
      * Module Sync Distributed Map Instance.
      *
-     * @return configured map of module sync semaphore
+     * @return configured map of module sync semaphores
      */
     @Bean
-    public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() {
-        return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")
-                .getMap("moduleSyncSemaphore");
+    public ConcurrentMap<String, Boolean> moduleSyncSemaphores() {
+        return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig")
+                .getMap("moduleSyncSemaphores");
     }
 
     /**
      * Data Sync Distributed Map Instance.
      *
-     * @return configured map of data sync semaphore
+     * @return configured map of data sync semaphores
      */
     @Bean
-    public Map<String, String> dataSyncSemaphoreMap() {
-        return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig")
-                .getMap("dataSyncSemaphore");
+    public ConcurrentMap<String, Boolean> dataSyncSemaphores() {
+        return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig")
+                .getMap("dataSyncSemaphores");
     }
 
     private HazelcastInstance createHazelcastInstance(
index 92387ba..2451617 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.inventory;
 
 import static org.onap.cps.ncmp.api.impl.utils.YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle;
 import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS;
+import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -65,7 +66,7 @@ public class CmHandleQueries {
             final String cpsPath = "//public-properties[@name=\"" + publicPropertyQueryPair.getKey()
                 + "\" and @value=\"" + publicPropertyQueryPair.getValue() + "\"]";
 
-            final Collection<DataNode> dataNodes = getCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS);
+            final Collection<DataNode> dataNodes = queryCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS);
             if (cmHandleIdToNcmpServiceCmHandles == null) {
                 cmHandleIdToNcmpServiceCmHandles = collectDataNodesToNcmpServiceCmHandles(dataNodes);
             } else {
@@ -108,8 +109,8 @@ public class CmHandleQueries {
      * @param cmHandleState cm handle state
      * @return a list of cm handles
      */
-    public List<DataNode> getCmHandlesByState(final CmHandleState cmHandleState) {
-        return getCmHandleDataNodesByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
+    public List<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) {
+        return queryCmHandleDataNodesByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
             INCLUDE_ALL_DESCENDANTS);
     }
 
@@ -119,21 +120,23 @@ public class CmHandleQueries {
      * @param cpsPath cps path for which the cmHandle is requested
      * @return a list of data nodes representing the cm handles.
      */
-    public List<DataNode> getCmHandleDataNodesByCpsPath(final String cpsPath,
-                                                        final FetchDescendantsOption fetchDescendantsOption) {
+    public List<DataNode> queryCmHandleDataNodesByCpsPath(final String cpsPath,
+                                                          final FetchDescendantsOption fetchDescendantsOption) {
         return cpsDataPersistenceService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
             cpsPath + ANCESTOR_CM_HANDLES, fetchDescendantsOption);
     }
 
     /**
-     * Method which returns cm handles by the cm handle id and state.
+     * Method to check the state of a cm handle with given id.
+     *
      * @param cmHandleId cm handle id
-     * @param cmHandleState cm handle state
-     * @return a list of cm handles
+     * @param requiredCmHandleState the required state of the cm handle
+     * @return a boolean, true if the state is equal to the required state
      */
-    public List<DataNode> getCmHandlesByIdAndState(final String cmHandleId, final CmHandleState cmHandleState) {
-        return getCmHandleDataNodesByCpsPath("//cm-handles[@id='" + cmHandleId + "']/state[@cm-handle-state=\""
-                + cmHandleState + "\"]", FetchDescendantsOption.OMIT_DESCENDANTS);
+    public boolean cmHandleHasState(final String cmHandleId, final CmHandleState requiredCmHandleState) {
+        final DataNode stateDataNode = getCmHandleState(cmHandleId);
+        final String cmHandleStateAsString = (String) stateDataNode.getLeaves().get("cm-handle-state");
+        return CmHandleState.valueOf(cmHandleStateAsString).equals(requiredCmHandleState);
     }
 
     /**
@@ -141,8 +144,8 @@ public class CmHandleQueries {
      * @param dataStoreSyncState sync state
      * @return a list of cm handles
      */
-    public List<DataNode> getCmHandlesByOperationalSyncState(final DataStoreSyncState dataStoreSyncState) {
-        return getCmHandleDataNodesByCpsPath("//state/datastores" + "/operational[@sync-state=\""
+    public List<DataNode> queryCmHandlesByOperationalSyncState(final DataStoreSyncState dataStoreSyncState) {
+        return queryCmHandleDataNodesByCpsPath("//state/datastores" + "/operational[@sync-state=\""
                 + dataStoreSyncState + "\"]", FetchDescendantsOption.OMIT_DESCENDANTS);
     }
 
@@ -160,6 +163,12 @@ public class CmHandleQueries {
         return convertYangModelCmHandleToNcmpServiceCmHandle(YangDataConverter
             .convertCmHandleToYangModel(dataNode, dataNode.getLeaves().get("id").toString()));
     }
+
+    private DataNode getCmHandleState(final String cmHandleId) {
+        final String xpath = "/dmi-registry/cm-handles[@id='" + cmHandleId + "']/state";
+        return cpsDataPersistenceService.getDataNode(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
+                xpath, OMIT_DESCENDANTS);
+    }
 }
 
 
index 395fb01..45ba078 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * ============LICENSE_START=======================================================
+ *  ============LICENSE_START=======================================================
  *  Copyright (C) 2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
 package org.onap.cps.ncmp.api.inventory.sync;
 
 import java.time.OffsetDateTime;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsDataService;
-import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
 import org.onap.cps.ncmp.api.inventory.CompositeState;
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
@@ -37,39 +38,45 @@ import org.springframework.stereotype.Service;
 @Service
 public class DataSyncWatchdog {
 
+    private static final boolean DATA_SYNC_IN_PROGRESS = false;
+    private static final boolean DATA_SYNC_DONE = true;
+
     private final InventoryPersistence inventoryPersistence;
 
     private final CpsDataService cpsDataService;
 
     private final SyncUtils syncUtils;
 
+    @Qualifier("dataSyncSemaphores")
+    private final ConcurrentMap<String, Boolean> dataSyncSemaphores;
+
     /**
      * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
      * 'UNSYNCHRONIZED'.
      */
     @Scheduled(fixedDelayString = "${timers.cm-handle-data-sync.sleep-time-ms:30000}")
     public void executeUnSynchronizedReadyCmHandlePoll() {
-        YangModelCmHandle unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle();
-        while (unSynchronizedReadyCmHandle != null) {
+        syncUtils.getUnsynchronizedReadyCmHandles().forEach(unSynchronizedReadyCmHandle -> {
             final String cmHandleId = unSynchronizedReadyCmHandle.getId();
-            log.debug("Cm-Handles found in READY and UNSYNCHRONIZED state: {}", cmHandleId);
-            final CompositeState compositeState = inventoryPersistence
-                    .getCmHandleState(cmHandleId);
-            final String resourceData = syncUtils.getResourceData(cmHandleId);
-            if (resourceData == null) {
-                log.debug("Error accessing the node for Cm-Handle: {}", cmHandleId);
-            } else if (unSynchronizedReadyCmHandle.getCompositeState().getDataSyncEnabled().equals(false)) {
-                log.debug("Error: data sync enabled for {} must be true."
-                    + "Data sync enabled is currently set to false", cmHandleId);
+            if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+                log.debug("Executing data sync on {}", cmHandleId);
+                final CompositeState compositeState = inventoryPersistence
+                        .getCmHandleState(cmHandleId);
+                final String resourceData = syncUtils.getResourceData(cmHandleId);
+                if (resourceData == null) {
+                    log.debug("Error retrieving resource data for Cm-Handle: {}", cmHandleId);
+                } else {
+                    cpsDataService.saveData("NFP-Operational", cmHandleId,
+                            resourceData, OffsetDateTime.now());
+                    setSyncStateToSynchronized().accept(compositeState);
+                    inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+                    updateDataSyncSemaphoreMap(cmHandleId);
+                }
             } else {
-                cpsDataService.saveData("NFP-Operational", cmHandleId,
-                        resourceData, OffsetDateTime.now());
-                setSyncStateToSynchronized().accept(compositeState);
-                inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+                log.debug("{} already processed by another instance", cmHandleId);
             }
-            unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle();
-        }
-        log.debug("No Cm-Handles currently found in an READY State and Operational Sync State is UNSYNCHRONIZED");
+        });
+        log.debug("No Cm-Handles currently found in READY State and Operational Sync State is UNSYNCHRONIZED");
     }
 
     private Consumer<CompositeState> setSyncStateToSynchronized() {
@@ -81,4 +88,12 @@ public class DataSyncWatchdog {
                             .lastSyncTime(CompositeState.nowInSyncTimeFormat()).build());
         };
     }
+
+    private void updateDataSyncSemaphoreMap(final String cmHandleId) {
+        dataSyncSemaphores.replace(cmHandleId, DATA_SYNC_DONE);
+    }
+
+    private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+        return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS) == null;
+    }
 }
index 7c2a4fc..be811a1 100644 (file)
@@ -31,6 +31,7 @@ import org.onap.cps.ncmp.api.inventory.CmHandleState;
 import org.onap.cps.ncmp.api.inventory.CompositeState;
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -39,13 +40,17 @@ import org.springframework.stereotype.Component;
 @Component
 public class ModuleSyncWatchdog {
 
+    private static final boolean MODEL_SYNC_IN_PROGRESS = false;
+    private static final boolean MODEL_SYNC_DONE = true;
+
     private final InventoryPersistence inventoryPersistence;
 
     private final SyncUtils syncUtils;
 
     private final ModuleSyncService moduleSyncService;
 
-    private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+    @Qualifier("moduleSyncSemaphores")
+    private final ConcurrentMap<String, Boolean> moduleSyncSemaphores;
 
     private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
 
@@ -100,10 +105,10 @@ public class ModuleSyncWatchdog {
     }
 
     private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
-        moduleSyncSemaphoreMap.replace(cmHandleId, true);
+        moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE);
     }
 
     private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
-        return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+        return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null;
     }
 }
index 2b7d3c9..64ce218 100644 (file)
@@ -44,7 +44,6 @@ import org.onap.cps.ncmp.api.inventory.CmHandleQueries;
 import org.onap.cps.ncmp.api.inventory.CmHandleState;
 import org.onap.cps.ncmp.api.inventory.CompositeState;
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
 import org.onap.cps.spi.FetchDescendantsOption;
 import org.onap.cps.spi.model.DataNode;
@@ -56,8 +55,6 @@ import org.springframework.stereotype.Service;
 @Service
 @RequiredArgsConstructor
 public class SyncUtils {
-    private final InventoryPersistence inventoryPersistence;
-
     private final CmHandleQueries cmHandleQueries;
 
     private final DmiDataOperations dmiDataOperations;
@@ -73,7 +70,7 @@ public class SyncUtils {
      */
     public List<YangModelCmHandle> getAdvisedCmHandles() {
         final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
-            cmHandleQueries.getCmHandlesByState(CmHandleState.ADVISED));
+                cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED));
         log.info("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
         if (advisedCmHandlesAsDataNodeList.isEmpty()) {
             return Collections.emptyList();
@@ -86,25 +83,26 @@ public class SyncUtils {
      * First query data nodes for cm handles with CM Handle Operational Sync State in "UNSYNCHRONIZED" and
      * randomly select a CM Handle and query the data nodes for CM Handle State in "READY".
      *
-     * @return a random yang model cm handle with State in READY and Operation Sync State in "UNSYNCHRONIZED",
-     *         return null if not found
+     * @return a randomized yang model cm handle list with State in READY and Operation Sync State in "UNSYNCHRONIZED",
+     *         return empty list if not found
      */
-    public YangModelCmHandle getAnUnSynchronizedReadyCmHandle() {
-        final List<DataNode> unSynchronizedCmHandles = cmHandleQueries
-            .getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED);
-        if (unSynchronizedCmHandles.isEmpty()) {
-            return null;
-        }
-        Collections.shuffle(unSynchronizedCmHandles);
-        for (final DataNode cmHandle : unSynchronizedCmHandles) {
-            final String cmHandleId = cmHandle.getLeaves().get("id").toString();
-            final List<DataNode> readyCmHandles = cmHandleQueries
-                .getCmHandlesByIdAndState(cmHandleId, CmHandleState.READY);
-            if (!readyCmHandles.isEmpty()) {
-                return inventoryPersistence.getYangModelCmHandle(cmHandleId);
+    public List<YangModelCmHandle> getUnsynchronizedReadyCmHandles() {
+        final List<DataNode> unsynchronizedCmHandles = cmHandleQueries
+                .queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED);
+
+        final List<YangModelCmHandle> yangModelCmHandles = new ArrayList<>();
+        for (final DataNode unsynchronizedCmHandle : unsynchronizedCmHandles) {
+            final String cmHandleId = unsynchronizedCmHandle.getLeaves().get("id").toString();
+            if (cmHandleQueries.cmHandleHasState(cmHandleId, CmHandleState.READY)) {
+                yangModelCmHandles.addAll(
+                        convertCmHandlesDataNodesToYangModelCmHandles(
+                                Collections.singletonList(unsynchronizedCmHandle)));
             }
         }
-        return null;
+
+        Collections.shuffle(yangModelCmHandles);
+
+        return yangModelCmHandles;
     }
 
     /**
@@ -113,9 +111,9 @@ public class SyncUtils {
      * @return a random LOCKED yang model cm handle, return null if not found
      */
     public List<YangModelCmHandle> getModuleSyncFailedCmHandles() {
-        final List<DataNode> lockedCmHandlesAsDataNodeList = cmHandleQueries.getCmHandleDataNodesByCpsPath(
-            "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]",
-            FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+        final List<DataNode> lockedCmHandlesAsDataNodeList = cmHandleQueries.queryCmHandleDataNodesByCpsPath(
+                "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]",
+                FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
         return convertCmHandlesDataNodesToYangModelCmHandles(lockedCmHandlesAsDataNodeList);
     }
 
@@ -136,8 +134,8 @@ public class SyncUtils {
             }
         }
         compositeState.setLockReason(CompositeState.LockReason.builder()
-            .details(String.format("Attempt #%d failed: %s", attempt, errorMessage))
-            .lockReasonCategory(lockReasonCategory).build());
+                .details(String.format("Attempt #%d failed: %s", attempt, errorMessage))
+                .lockReasonCategory(lockReasonCategory).build());
     }
 
 
@@ -150,8 +148,8 @@ public class SyncUtils {
     public boolean isReadyForRetry(final CompositeState compositeState) {
         int timeInMinutesUntilNextAttempt = 1;
         final OffsetDateTime time =
-            OffsetDateTime.parse(compositeState.getLastUpdateTime(),
-                DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+                OffsetDateTime.parse(compositeState.getLastUpdateTime(),
+                        DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
         final Matcher matcher = retryAttemptPattern.matcher(compositeState.getLockReason().getDetails());
         if (matcher.find()) {
             timeInMinutesUntilNextAttempt = (int) Math.pow(2, Integer.parseInt(matcher.group(1)));
@@ -161,7 +159,7 @@ public class SyncUtils {
         final int timeSinceLastAttempt = (int) Duration.between(time, OffsetDateTime.now()).toMinutes();
         if (timeInMinutesUntilNextAttempt >= timeSinceLastAttempt) {
             log.info("Time until next attempt is {} minutes: ",
-                timeInMinutesUntilNextAttempt - timeSinceLastAttempt);
+                    timeInMinutesUntilNextAttempt - timeSinceLastAttempt);
         }
         return timeSinceLastAttempt > timeInMinutesUntilNextAttempt;
     }
@@ -174,8 +172,8 @@ public class SyncUtils {
      */
     public String getResourceData(final String cmHandleId) {
         final ResponseEntity<Object> resourceDataResponseEntity = dmiDataOperations.getResourceDataFromDmi(
-            cmHandleId, DmiOperations.DataStoreEnum.PASSTHROUGH_OPERATIONAL,
-            UUID.randomUUID().toString());
+                cmHandleId, DmiOperations.DataStoreEnum.PASSTHROUGH_OPERATIONAL,
+                UUID.randomUUID().toString());
         if (resourceDataResponseEntity.getStatusCode().is2xxSuccessful()) {
             return getFirstResource(resourceDataResponseEntity.getBody());
         }
@@ -190,9 +188,10 @@ public class SyncUtils {
         return jsonObjectMapper.asJsonString(Map.of(firstElement.getKey(), firstElement.getValue()));
     }
 
-    private List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles(
-        final List<DataNode> cmHandlesAsDataNodeList) {
-        return cmHandlesAsDataNodeList.stream().map(dataNode -> YangDataConverter.convertCmHandleToYangModel(dataNode,
-            dataNode.getLeaves().get("id").toString())).collect(Collectors.toList());
+    private static List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles(
+            final List<DataNode> cmHandlesAsDataNodeList) {
+        return cmHandlesAsDataNodeList.stream()
+                .map(cmHandle -> YangDataConverter.convertCmHandleToYangModel(cmHandle,
+                        cmHandle.getLeaves().get("id").toString())).collect(Collectors.toList());
     }
 }
index 19c5049..f1294ce 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.cps.ncmp.api.impl
 
 import org.onap.cps.cpspath.parser.PathParsingException
-import org.onap.cps.ncmp.api.NetworkCmProxyCmHandlerQueryService
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence
 import org.onap.cps.ncmp.api.inventory.CmHandleQueries
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
@@ -52,7 +51,7 @@ class NetworkCmProxyCmHandlerQueryServiceSpec extends Specification {
             def conditionProperties = createConditionProperties('cmHandleWithCpsPath', [['cpsPath' : '/some/cps/path']])
             cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties])
         and: 'cmHandleQueries returns a non null query result'
-            cmHandleQueries.getCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [new DataNode(leaves: ['id':'some-cmhandle-id'])]
+            cmHandleQueries.queryCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [new DataNode(leaves: ['id':'some-cmhandle-id'])]
         and: 'CmHandleQueries returns cmHandles with the relevant query result'
             cmHandleQueries.combineCmHandleQueries(*_) >> ['PNFDemo1': new NcmpServiceCmHandle(cmHandleId: 'PNFDemo1'), 'PNFDemo3': new NcmpServiceCmHandle(cmHandleId: 'PNFDemo3')]
         when: 'the query is executed for both cm handle ids and details'
@@ -70,7 +69,7 @@ class NetworkCmProxyCmHandlerQueryServiceSpec extends Specification {
             def conditionProperties = createConditionProperties('cmHandleWithCpsPath', [['cpsPath' : '/some/cps/path']])
             cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties])
         and: 'cmHandleQueries throws a path parsing exception'
-            cmHandleQueries.getCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> { throw thrownException }
+            cmHandleQueries.queryCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> { throw thrownException }
         when: 'the query is executed for both cm handle ids and details'
             objectUnderTest.queryCmHandleIds(cmHandleQueryParameters)
             objectUnderTest.queryCmHandles(cmHandleQueryParameters)
@@ -134,7 +133,7 @@ class NetworkCmProxyCmHandlerQueryServiceSpec extends Specification {
         and: 'cmHandles are returned from the module names query'
             inventoryPersistence.queryAnchors(['some-module-name']) >> anchorsForModuleQuery
         and: 'cmHandleQueries returns a datanode result'
-            2 * cmHandleQueries.getCmHandleDataNodesByCpsPath(*_) >> [someCmHandleDataNode]
+            2 * cmHandleQueries.queryCmHandleDataNodesByCpsPath(*_) >> [someCmHandleDataNode]
         when: 'the query is executed for both cm handle ids and details'
             def returnedCmHandlesJustIds = objectUnderTest.queryCmHandleIds(cmHandleQueryParameters)
             def returnedCmHandlesWithData = objectUnderTest.queryCmHandles(cmHandleQueryParameters)
index fe7ed9e..ea84b44 100644 (file)
@@ -31,19 +31,19 @@ import spock.lang.Specification
 class SynchronizationSemaphoresConfigSpec extends Specification {
 
     @Autowired
-    private Map<String, String> moduleSyncSemaphore;
+    private Map<String, Boolean> moduleSyncSemaphores;
 
     @Autowired
-    private Map<String, String> dataSyncSemaphore;
+    private Map<String, Boolean> dataSyncSemaphores;
 
     def 'Embedded Sync Semaphores'() {
-        expect: 'system is able to create an instance of ModuleSyncSemaphore'
-            assert null != moduleSyncSemaphore
-        and: 'system is able to create an instance of DataSyncSemaphore'
-            assert null != dataSyncSemaphore
+        expect: 'system is able to create an instance of ModuleSyncSemaphores'
+            assert null != moduleSyncSemaphores
+        and: 'system is able to create an instance of DataSyncSemaphores'
+            assert null != dataSyncSemaphores
         and: 'we have 2 instances'
             assert Hazelcast.allHazelcastInstances.size() == 2
         and: 'the names match'
-            assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphore', 'dataSyncSemaphore']
+            assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores']
     }
 }
index 10a5d62..ff17330 100644 (file)
@@ -92,31 +92,31 @@ class CmHandleQueriesSpec extends Specification {
             cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry',
                 '//state[@cm-handle-state="ADVISED"]/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS) >> sampleDataNodes
         when: 'cm handles are fetched by state'
-            def result = objectUnderTest.getCmHandlesByState(cmHandleState)
+            def result = objectUnderTest.queryCmHandlesByState(cmHandleState)
         then: 'the returned result matches the result from the persistence service'
             assert result == sampleDataNodes
     }
 
-    def 'Get Cm Handles By State and Cm-Handle Id'() {
+    def 'Get Cm Handles state by Cm-Handle Id'() {
         given: 'a cm handle state to query'
             def cmHandleState = CmHandleState.READY
         and: 'cps data service returns a list of data nodes'
-            cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry',
-                '//cm-handles[@id=\'some-cm-handle\']/state[@cm-handle-state="'+ 'READY'+'"]/ancestor::cm-handles', OMIT_DESCENDANTS) >> sampleDataNodes
+            cpsDataPersistenceService.getDataNode('NCMP-Admin', 'ncmp-dmi-registry',
+                '/dmi-registry/cm-handles[@id=\'some-cm-handle\']/state', OMIT_DESCENDANTS) >> new DataNode(leaves: ['cm-handle-state': 'READY'])
         when: 'cm handles are fetched by state and id'
-            def result = objectUnderTest.getCmHandlesByIdAndState('some-cm-handle', cmHandleState)
+            def result = objectUnderTest.getCmHandleState('some-cm-handle')
         then: 'the returned result is a list of data nodes returned by cps data service'
-            assert result == sampleDataNodes
+            assert result == new DataNode(leaves: ['cm-handle-state': 'READY'])
     }
 
-    def 'Get Cm Handles By Operational Sync State : UNSYNCHRONIZED'() {
+    def 'Retrieve Cm Handles By Operational Sync State : UNSYNCHRONIZED'() {
         given: 'a cm handle state to query'
             def cmHandleState = CmHandleState.READY
         and: 'cps data service returns a list of data nodes'
             cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry',
                 '//state/datastores/operational[@sync-state="'+'UNSYNCHRONIZED'+'"]/ancestor::cm-handles', OMIT_DESCENDANTS) >> sampleDataNodes
         when: 'cm handles are fetched by the UNSYNCHRONIZED operational sync state'
-            def result = objectUnderTest.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED)
+            def result = objectUnderTest.queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED)
         then: 'the returned result is a list of data nodes returned by cps data service'
             assert result == sampleDataNodes
     }
@@ -130,7 +130,7 @@ class CmHandleQueriesSpec extends Specification {
                 cpsPath + '/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS)
                 >> Arrays.asList(cmHandleDataNode)
         when: 'get cm handles by cps path is invoked'
-            def result = objectUnderTest.getCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS)
+            def result = objectUnderTest.queryCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS)
         then: 'the returned result is a list of data nodes returned by cps data service'
             assert result.contains(cmHandleDataNode)
     }
@@ -26,10 +26,11 @@ import org.onap.cps.ncmp.api.inventory.CmHandleState
 import org.onap.cps.ncmp.api.inventory.CompositeState
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState
-import spock.lang.Shared
 import spock.lang.Specification
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ConcurrentMap
 
-class DataSyncSpec extends Specification {
+class DataSyncWatchdogSpec extends Specification {
 
     def mockInventoryPersistence = Mock(InventoryPersistence)
 
@@ -37,10 +38,11 @@ class DataSyncSpec extends Specification {
 
     def mockSyncUtils = Mock(SyncUtils)
 
-    @Shared
+    def stubbedMap = Stub(ConcurrentMap)
+
     def jsonString = '{"stores:bookstore":{"categories":[{"code":"01"}]}}'
 
-    def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils)
+    def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils, stubbedMap as ConcurrentHashMap)
 
     def compositeState = getCompositeState()
 
@@ -52,7 +54,7 @@ class DataSyncSpec extends Specification {
         given: 'sample resource data'
             def resourceData = jsonString
         and: 'sync utilities return a cm handle twice'
-            mockSyncUtils.getAnUnSynchronizedReadyCmHandle() >>> [yangModelCmHandle1, yangModelCmHandle2, null]
+            mockSyncUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
         when: 'data sync poll is executed'
             objectUnderTest.executeUnSynchronizedReadyCmHandlePoll()
         then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
@@ -73,24 +75,18 @@ class DataSyncSpec extends Specification {
             1 * mockInventoryPersistence.saveCmHandleState('some-cm-handle-2', compositeState)
     }
 
-    def 'Schedule Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED which return empty data from Node because #scenario'() {
-        given: 'a yang model cm handle'
-            def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: new CompositeState(dataSyncEnabled: dataSyncEnabled))
-        and: 'sync utilities returns a single cm handle'
-            mockSyncUtils.getAnUnSynchronizedReadyCmHandle() >>> [yangModelCmHandle, null]
+    def 'Schedule Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED which return empty data from Node'() {
+        given: 'cm handles in an ready state and operational sync state in unsynchronized'
+        and: 'sync utilities return a cm handle twice'
+            mockSyncUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1]
         when: 'data sync poll is executed'
             objectUnderTest.executeUnSynchronizedReadyCmHandlePoll()
         then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
-            1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState
+            1 * mockInventoryPersistence.getCmHandleState('some-cm-handle-1') >> compositeState
         and: 'the sync util returns first resource data'
-            1 * mockSyncUtils.getResourceData('some-cm-handle') >> resourceData
+            1 * mockSyncUtils.getResourceData('some-cm-handle-1') >> null
         and: 'the cm-handle data is not saved'
             0 * mockCpsDataService.saveData('NFP-Operational', 'some-cm-handle-1', jsonString, _)
-        where:
-            scenario                                             | dataSyncEnabled | resourceData
-            'data sync is not enabled'                           | false           | jsonString
-            'resource data is null'                              | true            | null
-            'data sync is not enabled and resource data is null' | false           | null
     }
 
     def createSampleYangModelCmHandle(cmHandleId) {
@@ -100,7 +96,7 @@ class DataSyncSpec extends Specification {
 
     def getCompositeState() {
         def cmHandleState = CmHandleState.READY
-        def compositeState = new CompositeState(cmHandleState: cmHandleState, dataSyncEnabled: true)
+        def compositeState = new CompositeState(cmHandleState: cmHandleState)
         compositeState.setDataStores(CompositeState.DataStores.builder()
             .operationalDataStore(CompositeState.Operational.builder().dataStoreSyncState(DataStoreSyncState.SYNCHRONIZED)
                 .build()).build())
index fb4ca39..52fb110 100644 (file)
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.onap.cps.ncmp.api.impl.operations.DmiDataOperations
 import org.onap.cps.ncmp.api.impl.operations.DmiOperations
 import org.onap.cps.ncmp.api.inventory.CmHandleQueries
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.inventory.CmHandleState
 import org.onap.cps.ncmp.api.inventory.CompositeState
 import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
@@ -54,7 +55,7 @@ class SyncUtilsSpec extends Specification{
 
     def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
-    def objectUnderTest = new SyncUtils(mockInventoryPersistence, mockCmHandleQueries, mockDmiDataOperations, jsonObjectMapper)
+    def objectUnderTest = new SyncUtils(mockCmHandleQueries, mockDmiDataOperations, jsonObjectMapper)
 
     @Shared
     def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(OffsetDateTime.now())
@@ -68,7 +69,7 @@ class SyncUtilsSpec extends Specification{
 
     def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
         given: 'the inventory persistence service returns a collection of data nodes'
-            mockCmHandleQueries.getCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
+            mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
         and: 'we have some additional (dmi, private) properties'
             dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]'
             dataNode.childDataNodes = [dataNodeAdditionalProperties]
@@ -106,7 +107,7 @@ class SyncUtilsSpec extends Specification{
 
     def 'Get all locked Cm-Handle where Lock Reason is LOCKED_MODULE_SYNC_FAILED cm handle #scenario'() {
         given: 'the cps (persistence service) returns a collection of data nodes'
-            mockCmHandleQueries.getCmHandleDataNodesByCpsPath(
+            mockCmHandleQueries.queryCmHandleDataNodesByCpsPath(
                 '//lock-reason[@reason="LOCKED_MODULE_SYNC_FAILED"]',
                 FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [dataNode]
         when: 'get locked Misbehaving cm handle is called'
@@ -132,21 +133,21 @@ class SyncUtilsSpec extends Specification{
     }
 
 
-    def 'Get a Cm-Handle where Operational Sync state is UnSynchronized and Cm-handle state is READY and #scenario'() {
+    def 'Get a Cm-Handle where #scenario'() {
         given: 'the inventory persistence service returns a collection of data nodes'
-            mockCmHandleQueries.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
-            mockCmHandleQueries.getCmHandlesByIdAndState("cm-handle-123", CmHandleState.READY) >> readyDataNodes
+            mockCmHandleQueries.queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
+            mockCmHandleQueries.cmHandleHasState('cm-handle-123', CmHandleState.READY) >> cmHandleHasState
         when: 'get advised cm handles are fetched'
-            objectUnderTest.getAnUnSynchronizedReadyCmHandle()
+            def yangModelCollection = objectUnderTest.getUnsynchronizedReadyCmHandles()
         then: 'the returned data node collection is the correct size'
-            readyDataNodes.size() == expectedDataNodeSize
-        and: 'get yang model cm handles is invoked the correct number of times'
-            expectedCallsToGetYangModelCmHandle * mockInventoryPersistence.getYangModelCmHandle('cm-handle-123')
+            yangModelCollection.size() == expectedDataNodeSize
+        and: 'the result contains the correct data'
+            yangModelCollection.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) == expectedYangModelCollectionIds
         where: 'the following scenarios are used'
-            scenario                             | unSynchronizedDataNodes | readyDataNodes || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
-            'exists'                             | [dataNode]              | [dataNode]     || 1                                   | 1
-            'unsynchronized exist but not ready' | [dataNode]              | []             || 0                                   | 0
-            'does not exist'                     | []                      | []             || 0                                   | 0
+            scenario                                   | unSynchronizedDataNodes | cmHandleHasState || expectedDataNodeSize | expectedYangModelCollectionIds
+            'a Cm-Handle unsynchronized and ready'     | [dataNode]              | true             || 1                    | ['cm-handle-123'] as Set
+            'a Cm-Handle unsynchronized but not ready' | [dataNode]              | false            || 0                    | [] as Set
+            'all Cm-Handle synchronized'               | []                      | false            || 0                    | [] as Set
     }
 
     def 'Get resource data through DMI Operations #scenario'() {