Performance Improvement: Watchdog Parallel execution with configuration
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / inventory / sync / ModuleSyncTasks.java
index 5e26650..597e2ba 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
@@ -52,28 +53,35 @@ public class ModuleSyncTasks {
      * Perform module sync on a batch of cm handles.
      *
      * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+     * @param batchCounter the number of batches currently being processed, will be decreased when task is finished
+     *                     or fails
      * @return completed future to handle post-processing
      */
-    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
-        final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
-        for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
-            final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
-            final YangModelCmHandle yangModelCmHandle =
-                YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
-            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-            try {
-                moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
-                moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
-                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
-            } catch (final Exception e) {
-                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                    LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
-                setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
-                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
+                                                     final AtomicInteger batchCounter) {
+        try {
+            final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+            for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+                final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+                final YangModelCmHandle yangModelCmHandle =
+                        YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+                try {
+                    moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+                    cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+                } catch (final Exception e) {
+                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                    setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+                    cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+                }
+                log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
             }
-            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+            updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
+        } finally {
+            batchCounter.getAndDecrement();
         }
-        updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
         return COMPLETED_FUTURE;
     }