[CPS] Re-structuring the packages for better understanding
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / inventory / sync / ModuleSyncTasks.java
index 5e26650..914b626 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022 Nordix Foundation
+ *  Copyright (C) 2022-2023 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 
 package org.onap.cps.ncmp.api.inventory.sync;
 
+import com.hazelcast.map.IMap;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
+import org.onap.cps.ncmp.api.impl.events.lcm.LcmEventsCmHandleStateHandler;
 import org.onap.cps.ncmp.api.impl.utils.YangDataConverter;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
 import org.onap.cps.ncmp.api.inventory.CmHandleState;
@@ -45,57 +47,66 @@ public class ModuleSyncTasks {
     private final SyncUtils syncUtils;
     private final ModuleSyncService moduleSyncService;
     private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
-
-    private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
+    private final IMap<String, Object> moduleSyncStartedOnCmHandles;
 
     /**
      * Perform module sync on a batch of cm handles.
      *
-     * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+     * @param cmHandlesAsDataNodes         a batch of Data nodes representing cm handles to perform module sync on
+     * @param batchCounter                 the number of batches currently being processed, will be decreased when
+     *                                     task is finished or fails
      * @return completed future to handle post-processing
      */
-    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
-        final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
-        for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
-            final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
-            final YangModelCmHandle yangModelCmHandle =
-                YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
-            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-            try {
-                moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
-                moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
-                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
-            } catch (final Exception e) {
-                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                    LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
-                setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
-                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
+                                                     final AtomicInteger batchCounter) {
+        try {
+            final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+            for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+                final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+                final YangModelCmHandle yangModelCmHandle =
+                        YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+                try {
+                    moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+                    cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+                } catch (final Exception e) {
+                    log.warn("Processing of {} module sync failed due to reason {}.", cmHandleId, e.getMessage());
+                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                    setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+                    cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+                }
+                log.info("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
             }
-            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+            lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandelStatePerCmHandle);
+        } finally {
+            batchCounter.getAndDecrement();
+            log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
         }
-        updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
-        return COMPLETED_FUTURE;
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
      * Reset state to "ADVISED" for any previously failed cm handles.
      *
      * @param failedCmHandles previously failed (locked) cm handles
-     * @return completed future to handle post-processing
      */
-    public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) {
+    public void resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) {
         final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size());
         for (final YangModelCmHandle failedCmHandle : failedCmHandles) {
             final CompositeState compositeState = failedCmHandle.getCompositeState();
             final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
+            log.info("Retry for cmHandleId : {} is {}", failedCmHandle.getId(), isReadyForRetry);
             if (isReadyForRetry) {
+                final String resetCmHandleId = failedCmHandle.getId();
                 log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog",
-                    failedCmHandle.getId());
+                        resetCmHandleId);
                 cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED);
+                removeResetCmHandleFromModuleSyncMap(resetCmHandleId);
             }
         }
-        updateCmHandlesStateBatch(cmHandleStatePerCmHandle);
-        return COMPLETED_FUTURE;
+        lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
     }
 
     private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
@@ -103,11 +114,9 @@ public class ModuleSyncTasks {
         advisedCmHandle.getCompositeState().setLockReason(lockReason);
     }
 
-    private void updateCmHandlesStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
-        // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13)
-        for (final Map.Entry<YangModelCmHandle, CmHandleState> entry : cmHandleStatePerCmHandle.entrySet()) {
-            lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue());
+    private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) {
+        if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) {
+            log.info("{} removed from in progress map", resetCmHandleId);
         }
     }
-
 }