Distributed datastore solution for Module Sync Watchdog
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / inventory / sync / ModuleSyncWatchdog.java
index 3f81194..c71f68f 100644 (file)
@@ -22,6 +22,7 @@
 package org.onap.cps.ncmp.api.inventory.sync;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -49,25 +50,33 @@ public class ModuleSyncWatchdog {
     @Value("${data-sync.cache.enabled:false}")
     private boolean isGlobalDataSyncCacheEnabled;
 
+    private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+
     /**
      * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
      */
     @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
     public void executeAdvisedCmHandlePoll() {
-        syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> {
+        syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
             final String cmHandleId = advisedCmHandle.getId();
-            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-            try {
-                moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
-                moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
-                setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
-            } catch (final Exception e) {
-                setCompositeStateToLocked().accept(compositeState);
-                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                        LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+            if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+                log.debug("executing module sync on {}", cmHandleId);
+                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+                try {
+                    moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
+                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
+                    setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
+                    updateModuleSyncSemaphoreMap(cmHandleId);
+                } catch (final Exception e) {
+                    setCompositeStateToLocked().accept(compositeState);
+                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                }
+                inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+                log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+            } else {
+                log.debug("{} already processed by another instance", cmHandleId);
             }
-            inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
-            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
         });
         log.debug("No Cm-Handles currently found in an ADVISED state");
     }
@@ -119,8 +128,15 @@ public class ModuleSyncWatchdog {
 
     private CompositeState.Operational getDataStoreSyncState(final boolean dataSyncEnabled) {
         final DataStoreSyncState dataStoreSyncState = dataSyncEnabled
-            ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
+                ? DataStoreSyncState.UNSYNCHRONIZED : DataStoreSyncState.NONE_REQUESTED;
         return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build();
     }
 
+    private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
+        moduleSyncSemaphoreMap.replace(cmHandleId, true);
+    }
+
+    private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+        return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+    }
 }