Merge "Enable/Disable Data Sync for Cm Handle"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / inventory / sync / ModuleSyncWatchdog.java
index 6ec4419..37bd1ed 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * ============LICENSE_START=======================================================
+ *  ============LICENSE_START=======================================================
  *  Copyright (C) 2022 Nordix Foundation
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
@@ -22,6 +22,7 @@
 package org.onap.cps.ncmp.api.inventory.sync;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -45,28 +46,34 @@ public class ModuleSyncWatchdog {
 
     private final ModuleSyncService moduleSyncService;
 
+    private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+
     /**
      * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
      */
     @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
     public void executeAdvisedCmHandlePoll() {
-        YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
-        while (advisedCmHandle != null) {
+        syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
             final String cmHandleId = advisedCmHandle.getId();
-            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-            try {
-                moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
-                moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
-                setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
-            } catch (final Exception e) {
-                setCompositeStateToLocked().accept(compositeState);
-                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                        LockReasonCategory.LOCKED_MISBEHAVING, e.getMessage());
+            if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+                log.debug("executing module sync on {}", cmHandleId);
+                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+                try {
+                    moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
+                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
+                    setCompositeStateToReadyWithInitialDataStoreSyncState().accept(compositeState);
+                    updateModuleSyncSemaphoreMap(cmHandleId);
+                } catch (final Exception e) {
+                    setCompositeStateToLocked().accept(compositeState);
+                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                }
+                inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+                log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+            } else {
+                log.debug("{} already processed by another instance", cmHandleId);
             }
-            inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
-            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
-            advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
-        }
+        });
         log.debug("No Cm-Handles currently found in an ADVISED state");
     }
 
@@ -75,14 +82,14 @@ public class ModuleSyncWatchdog {
      */
     @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
     public void executeLockedCmHandlePoll() {
-        final List<YangModelCmHandle> lockedMisbehavingCmHandles = syncUtils.getLockedMisbehavingYangModelCmHandles();
-        for (final YangModelCmHandle moduleSyncFailedCmHandle : lockedMisbehavingCmHandles) {
-            final CompositeState compositeState = moduleSyncFailedCmHandle.getCompositeState();
+        final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
+        for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) {
+            final CompositeState compositeState = lockedCmHandle.getCompositeState();
             final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
             if (isReadyForRetry) {
                 setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
-                log.debug("Locked misbehaving cm handle {} is being recycled", moduleSyncFailedCmHandle.getId());
-                inventoryPersistence.saveCmHandleState(moduleSyncFailedCmHandle.getId(), compositeState);
+                log.debug("Locked cm handle {} is being re-synced", lockedCmHandle.getId());
+                inventoryPersistence.saveCmHandleState(lockedCmHandle.getId(), compositeState);
             }
         }
     }
@@ -96,11 +103,9 @@ public class ModuleSyncWatchdog {
 
     private Consumer<CompositeState> setCompositeStateToReadyWithInitialDataStoreSyncState() {
         return compositeState -> {
+            compositeState.setDataSyncEnabled(false);
             compositeState.setCmHandleState(CmHandleState.READY);
-            final CompositeState.Operational operational = CompositeState.Operational.builder()
-                    .dataStoreSyncState(DataStoreSyncState.UNSYNCHRONIZED)
-                    .lastSyncTime(CompositeState.nowInSyncTimeFormat())
-                    .build();
+            final CompositeState.Operational operational = getDataStoreSyncState();
             final CompositeState.DataStores dataStores = CompositeState.DataStores.builder()
                     .operationalDataStore(operational)
                     .build();
@@ -116,4 +121,17 @@ public class ModuleSyncWatchdog {
                 .details(oldLockReasonDetails).build();
         compositeState.setLockReason(lockReason);
     }
+
+    private CompositeState.Operational getDataStoreSyncState() {
+        final DataStoreSyncState dataStoreSyncState = DataStoreSyncState.NONE_REQUESTED;
+        return CompositeState.Operational.builder().dataStoreSyncState(dataStoreSyncState).build();
+    }
+
+    private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
+        moduleSyncSemaphoreMap.replace(cmHandleId, true);
+    }
+
+    private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+        return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+    }
 }