TTL for module and data sync 16/131416/8
authormpriyank <priyank.maheshwari@est.tech>
Mon, 10 Oct 2022 15:05:53 +0000 (16:05 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Thu, 13 Oct 2022 14:57:09 +0000 (15:57 +0100)
- Added configurable ttl parameters for module sync and data sync in the
  application yaml
- Changing strategy to set the TTLs now. Its been set for each key at
  the inserting time only for both the maps.
- Added test scenarios to verify the configs and the TTLs.

Issue-ID: CPS-1288
Change-Id: I0a95cbd1a3e540ff15e23027e79e07e9a26f4c19
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy

index c89388b..5154be7 100644 (file)
@@ -28,7 +28,6 @@ import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.map.IMap;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 import org.onap.cps.spi.model.DataNode;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -39,11 +38,12 @@ import org.springframework.context.annotation.Configuration;
 @Configuration
 public class SynchronizationCacheConfig {
 
+    public static final int MODULE_SYNC_STARTED_TTL_SECS = 60;
+    public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800;
+
     private static final QueueConfig commonQueueConfig = createQueueConfig();
-    private static final MapConfig moduleSyncStartedConfig =
-        createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1));
-    private static final MapConfig dataSyncSemaphoresConfig =
-        createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30));
+    private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
+    private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
 
     /**
      * Module Sync Distributed Queue Instance.
@@ -102,11 +102,10 @@ public class SynchronizationCacheConfig {
         return commonQueueConfig;
     }
 
-    private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) {
+    private static MapConfig createMapConfig(final String configName) {
         final MapConfig mapConfig = new MapConfig(configName);
         mapConfig.setBackupCount(3);
         mapConfig.setAsyncBackupCount(3);
-        mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds);
         return mapConfig;
     }
 
index 9336c3b..9fa75a0 100644 (file)
 
 package org.onap.cps.ncmp.api.inventory.sync;
 
+import com.hazelcast.map.IMap;
 import java.time.OffsetDateTime;
-import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.api.CpsDataService;
+import org.onap.cps.ncmp.api.impl.config.embeddedcache.SynchronizationCacheConfig;
 import org.onap.cps.ncmp.api.inventory.CompositeState;
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
@@ -46,7 +48,7 @@ public class DataSyncWatchdog {
 
     private final SyncUtils syncUtils;
 
-    private final Map<String, Boolean> dataSyncSemaphores;
+    private final IMap<String, Boolean> dataSyncSemaphores;
 
     /**
      * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
@@ -92,6 +94,7 @@ public class DataSyncWatchdog {
     }
 
     private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
-        return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS) == null;
+        return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS,
+                SynchronizationCacheConfig.DATA_SYNC_SEMAPHORE_TTL_SECS, TimeUnit.SECONDS) == null;
     }
 }
index b96889f..f629b71 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.config.embeddedcache.SynchronizationCacheConfig;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
 import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor;
 import org.onap.cps.spi.model.DataNode;
@@ -117,8 +118,9 @@ public class ModuleSyncWatchdog {
         log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size());
         for (final DataNode batchCandidate : nextBatchCandidates) {
             final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
-            final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP
-                .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP));
+            final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals(
+                    moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP,
+                            SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS));
             if (alreadyAddedToInProgressMap) {
                 log.debug("module sync for {} already in progress by other instance", cmHandleId);
             } else {
index 4cfc02b..c16d6b6 100644 (file)
@@ -28,6 +28,7 @@ import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.context.ContextConfiguration
 import spock.lang.Specification
 import java.util.concurrent.BlockingQueue
+import java.util.concurrent.TimeUnit
 
 @SpringBootTest
 @ContextConfiguration(classes = [SynchronizationCacheConfig])
@@ -40,7 +41,7 @@ class SynchronizationCacheConfigSpec extends Specification {
     private IMap<String, Object> moduleSyncStartedOnCmHandles
 
     @Autowired
-    private Map<String, Boolean> dataSyncSemaphores
+    private IMap<String, Boolean> dataSyncSemaphores
 
     def 'Embedded (hazelcast) Caches for Module and Data Sync.'() {
         expect: 'system is able to create an instance of the Module Sync Work Queue'
@@ -54,4 +55,36 @@ class SynchronizationCacheConfigSpec extends Specification {
         and: 'they have the correct names (in any order)'
             assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' )
     }
+
+    def 'Verify configs for Distributed objects'(){
+        given: 'the Module Sync Work Queue config'
+            def queueConfig =  Hazelcast.getHazelcastInstanceByName('moduleSyncWorkQueue').config.queueConfigs.get('defaultQueueConfig')
+        and: 'the Module Sync Started Cm Handle Map config'
+            def moduleSyncStartedOnCmHandlesConfig =  Hazelcast.getHazelcastInstanceByName('moduleSyncStartedOnCmHandles').config.mapConfigs.get('moduleSyncStartedConfig')
+        and: 'the Data Sync Semaphores Map config'
+            def dataSyncSemaphoresConfig =  Hazelcast.getHazelcastInstanceByName('dataSyncSemaphores').config.mapConfigs.get('dataSyncSemaphoresConfig')
+        expect: 'system created instance with correct config of Module Sync Work Queue'
+            assert queueConfig.backupCount == 3
+            assert queueConfig.asyncBackupCount == 3
+        and: 'Module Sync Started Cm Handle Map has the correct settings'
+            assert moduleSyncStartedOnCmHandlesConfig.backupCount == 3
+            assert moduleSyncStartedOnCmHandlesConfig.asyncBackupCount == 3
+        and: 'Data Sync Semaphore Map has the correct settings'
+            assert dataSyncSemaphoresConfig.backupCount == 3
+            assert dataSyncSemaphoresConfig.asyncBackupCount == 3
+    }
+
+    def 'Time to Live Verify for Module Sync and Data Sync Semaphore'() {
+        when: 'the keys are inserted with a TTL'
+            moduleSyncStartedOnCmHandles.put('testKeyModuleSync', 'toBeExpired' as Object, 1000, TimeUnit.MILLISECONDS)
+            dataSyncSemaphores.put('testKeyDataSync', Boolean.TRUE, 1000, TimeUnit.MILLISECONDS)
+        then: 'the entries are present in the map'
+            assert moduleSyncStartedOnCmHandles.get('testKeyModuleSync') != null
+            assert dataSyncSemaphores.get('testKeyDataSync') != null
+        and: 'we wait for the key expiration'
+            sleep(1500)
+        and: 'the keys should be expired as TTL elapsed'
+            assert moduleSyncStartedOnCmHandles.get('testKeyModuleSync') == null
+            assert dataSyncSemaphores.get('testKeyDataSync') == null
+    }
 }
index 6053819..707f3ea 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.cps.ncmp.api.inventory.sync
 
+import com.hazelcast.map.IMap
 import org.onap.cps.api.CpsDataService
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.inventory.CmHandleState
@@ -27,8 +28,6 @@ import org.onap.cps.ncmp.api.inventory.CompositeState
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState
 import spock.lang.Specification
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.ConcurrentMap
 
 class DataSyncWatchdogSpec extends Specification {
 
@@ -38,11 +37,11 @@ class DataSyncWatchdogSpec extends Specification {
 
     def mockSyncUtils = Mock(SyncUtils)
 
-    def stubbedMap = Stub(ConcurrentMap)
+    def mockDataSyncSemaphoreMap = Mock(IMap<String,Boolean>)
 
     def jsonString = '{"stores:bookstore":{"categories":[{"code":"01"}]}}'
 
-    def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils, stubbedMap as ConcurrentHashMap)
+    def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils, mockDataSyncSemaphoreMap)
 
     def compositeState = getCompositeState()