Distributed datastore solution for Module Sync Watchdog 22/129922/16
authorkissand <andras.zoltan.kiss@est.tech>
Thu, 14 Jul 2022 10:37:14 +0000 (12:37 +0200)
committerkissand <andras.zoltan.kiss@est.tech>
Wed, 27 Jul 2022 12:23:19 +0000 (14:23 +0200)
- use semaphore map in ModuleSyncWatchdog
- increase test timeout, because it needs more time for hazelcast
initialization

Issue-ID: CPS-1015
Change-Id: I71feed8fbbd047af9fabba29a5f762a1f17a1c78
Signed-off-by: kissand <andras.zoltan.kiss@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
cps-ri/src/test/groovy/org/onap/cps/spi/utils/SessionManagerIntegrationSpec.groovy

index 978c3d1..1efe176 100644 (file)
@@ -23,7 +23,10 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache;
 import com.hazelcast.config.Config;
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -33,32 +36,40 @@ import org.springframework.context.annotation.Configuration;
 @Configuration
 public class SynchronizationSemaphoresConfig {
 
+    private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30);
+
     /**
      * Module Sync Distributed Map Instance.
-     * @return  Instance of Map
+     *
+     * @return configured map of module sync semaphore
      */
     @Bean
-    public Map<String, String> moduleSyncSemaphore() {
-        return Hazelcast.newHazelcastInstance(
-                initializeDefaultMapConfig("moduleSyncSemaphore", "moduleSyncSemaphoreConfig"))
+    public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() {
+        return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")
                 .getMap("moduleSyncSemaphore");
     }
 
     /**
      * Data Sync Distributed Map Instance.
-     * @return  Instance of Map
+     *
+     * @return configured map of data sync semaphore
      */
     @Bean
-    public Map<String, String> dataSyncSemaphore() {
-        return Hazelcast.newHazelcastInstance(
-                initializeDefaultMapConfig("dataSyncSemaphore", "dataSyncSemaphoreConfig"))
+    public Map<String, String> dataSyncSemaphoreMap() {
+        return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig")
                 .getMap("dataSyncSemaphore");
     }
 
+    private HazelcastInstance createHazelcastInstance(
+            final String hazelcastInstanceName, final String configMapName) {
+        return Hazelcast.newHazelcastInstance(
+                initializeDefaultMapConfig(hazelcastInstanceName, configMapName));
+    }
+
     private Config initializeDefaultMapConfig(final String instanceName, final String configName) {
         final Config config = new Config(instanceName);
         final MapConfig mapConfig = new MapConfig(configName);
-        mapConfig.setTimeToLiveSeconds(30);
+        mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS);
         mapConfig.setBackupCount(3);
         mapConfig.setAsyncBackupCount(3);
         config.addMapConfig(mapConfig);
index 3f81194..c71f68f 100644 (file)
@@ -22,6 +22,7 @@
 package org.onap.cps.ncmp.api.inventory.sync;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -49,25 +50,33 @@ public class ModuleSyncWatchdog {
     @Value("${data-sync.cache.enabled:false}")
     private boolean isGlobalDataSyncCacheEnabled;
 
+    private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+
     /**
      * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
      */
     @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
     public void executeAdvisedCmHandlePoll() {
-        syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> {
+        syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
             final String cmHandleId = advisedCmHandle.getId();
-            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-            try {
-                moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
-                moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
-                setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
-            } catch (final Exception e) {
-                setCompositeStateToLocked().accept(compositeState);
-                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                        LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+            if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+                log.debug("executing module sync on {}", cmHandleId);
+                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+                try {
+                    moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
+                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
+                    setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
+                    updateModuleSyncSemaphoreMap(cmHandleId);
+                } catch (final Exception e) {
+                    setCompositeStateToLocked().accept(compositeState);
+                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                }
+                inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+                log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+            } else {
+                log.debug("{} already processed by another instance", cmHandleId);
             }
-            inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
-            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
         });
         log.debug("No Cm-Handles currently found in an ADVISED state");
     }
@@ -119,8 +128,15 @@ public class ModuleSyncWatchdog {
 
     private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) {
         final DataStoreSyncState dataStoreSyncState = dataSyncEnabled
-            ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
+                ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
         return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build();
     }
 
+    private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
+        moduleSyncSemaphoreMap.replace(cmHandleId, true);
+    }
+
+    private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+        return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+    }
 }
index 40a0e39..7455438 100644 (file)
@@ -30,6 +30,9 @@ import org.onap.cps.ncmp.api.inventory.LockReasonCategory
 import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
 import spock.lang.Specification
 
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ConcurrentMap
+
 class ModuleSyncWatchdogSpec extends Specification {
 
     def mockInventoryPersistence = Mock(InventoryPersistence)
@@ -38,9 +41,11 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def mockModuleSyncService = Mock(ModuleSyncService)
 
+    def stubbedMap = Stub(ConcurrentMap)
+
     def cmHandleState = CmHandleState.ADVISED
 
-    def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService)
+    def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap)
 
     def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles where #scenario'() {
         given: 'cm handles in an advised state and a data sync state'
index a1f6d58..ceb9dd4 100644 (file)
@@ -37,7 +37,7 @@ class SessionManagerIntegrationSpec extends CpsPersistenceSpecBase{
     CpsSessionFactory cpsSessionFactory
 
     def sessionId
-    def shortTimeoutForTesting = 200L
+    def shortTimeoutForTesting = 300L
 
     def setup(){
         sessionId = objectUnderTest.startSession()