Cps and Ncmp distributed lock for various use cases 83/139583/13
authormpriyank <priyank.maheshwari@est.tech>
Wed, 27 Nov 2024 14:43:28 +0000 (14:43 +0000)
committermpriyank <priyank.maheshwari@est.tech>
Thu, 19 Dec 2024 15:38:36 +0000 (15:38 +0000)
- introduced cpsAndNcmpLock to be used for any use case needing
  coordination. Since it can be used for any use case hence placing the
  class accordingly.
- currently lock is being used for populating workQueue.
- Removed FencedLock as it was part of CPSubsystem which is moved to
  hazelcast-enterprise in 5.5.* version onwards.
- added info level logging statement to verify just one thread at a time
  in the critical section
- Note : integration test to be part of a separate patch.

Issue-ID: CPS-2479
Change-Id: I0f33c7232786c517383e5093fda91fd9a1839021
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy

diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java
new file mode 100644 (file)
index 0000000..61cf939
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cache;
+
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.map.IMap;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class CpsAndNcmpLockConfig extends HazelcastCacheConfig {
+
+    // Lock names for different use cases ( to be used as cpsAndNcmpLock keys)
+    public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock";
+
+    private static final MapConfig cpsAndNcmpLockMapConfig = createMapConfig("cpsAndNcmpLockConfig");
+
+    /**
+     * Distributed instance used for locking purpose for various use cases in cps-and-ncmp.
+     * The key of the map entry is name of the lock and should be based on the use case we are locking.
+     *
+     * @return configured map of lock object to have distributed coordination.
+     */
+    @Bean
+    public IMap<String, String> cpsAndNcmpLock() {
+        return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock");
+    }
+
+
+}
index 3f2bb4f..5b71a8a 100644 (file)
 
 package org.onap.cps.ncmp.impl.inventory.sync;
 
+import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME;
+
 import com.hazelcast.map.IMap;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -46,7 +47,7 @@ public class ModuleSyncWatchdog {
     private final IMap<String, Object> moduleSyncStartedOnCmHandles;
     private final ModuleSyncTasks moduleSyncTasks;
     private final AsyncTaskExecutor asyncTaskExecutor;
-    private final Lock workQueueLock;
+    private final IMap<String, String> cpsAndNcmpLock;
     private final Sleeper sleeper;
 
     private static final int MODULE_SYNC_BATCH_SIZE = 100;
@@ -90,14 +91,16 @@ public class ModuleSyncWatchdog {
      * So it can be tested without the queue being emptied immediately as the main public method does.
      */
     public void populateWorkQueueIfNeeded() {
-        if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) {
+        if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) {
+            log.info("Lock acquired by thread : {}", Thread.currentThread().getName());
             try {
                 populateWorkQueue();
                 if (moduleSyncWorkQueue.isEmpty()) {
                     setPreviouslyLockedCmHandlesToAdvised();
                 }
             } finally {
-                workQueueLock.unlock();
+                cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME);
+                log.info("Lock released by thread : {}", Thread.currentThread().getName());
             }
         }
     }
index def8f37..d6ac242 100644 (file)
@@ -26,7 +26,6 @@ import com.hazelcast.config.QueueConfig;
 import com.hazelcast.config.SetConfig;
 import com.hazelcast.map.IMap;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.locks.Lock;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
 import org.springframework.context.annotation.Bean;
@@ -48,7 +47,6 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
     private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
     private static final SetConfig moduleSetTagsBeingProcessedConfig
         = createSetConfig("moduleSetTagsBeingProcessedConfig");
-    private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
 
     /**
      * Module Sync Distributed Queue Instance.
@@ -90,21 +88,4 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
     public ISet<String> moduleSetTagsBeingProcessed() {
         return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed");
     }
-
-    /**
-     * Retrieves a distributed lock used to control access to the work queue for module synchronization.
-     * This lock ensures that the population and modification of the work queue are thread-safe and
-     * protected from concurrent access across different nodes in the distributed system.
-     * The lock guarantees that only one instance of the application can populate or modify the
-     * module sync work queue at a time, preventing race conditions and potential data inconsistencies.
-     * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides
-     * strong consistency guarantees for distributed operations.
-     *
-     * @return a {@link Lock} instance used for synchronizing access to the work queue.
-     */
-    @Bean
-    public Lock workQueueLock() {
-        // TODO Method below does not use commonQueueConfig for creating lock (Refactor later)
-        return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE);
-    }
 }
index 4cf07e4..a9b88c2 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2023 Nordix Foundation
+ *  Copyright (C) 2022-2024 Nordix Foundation
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -44,11 +44,11 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
 
-    def mockWorkQueueLock = Mock(Lock)
+    def mockCpsAndNcmpLock = Mock(IMap<String,String>)
 
     def spiedSleeper = Spy(Sleeper)
 
-    def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper)
+    def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
 
     void setup() {
         spiedAsyncTaskExecutor.setupThreadPool()
@@ -59,14 +59,16 @@ class ModuleSyncWatchdogSpec extends Specification {
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles)
         and: 'module sync utilities returns no failed (locked) cm handles'
             mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
-        and: 'the work queue is not locked'
-            mockWorkQueueLock.tryLock() >> true
+        and: 'the work queue can be locked'
+            mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
         and: 'the executor has enough available threads'
             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs #expectedNumberOfTaskExecutions tasks'
             expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+        and: 'the executing thread is unlocked'
+            1 * mockCpsAndNcmpLock.unlock('workQueueLock')
         where: 'the following parameter are used'
             scenario              | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
             'none at all'         | 0                                                                 || 0
@@ -80,8 +82,8 @@ class ModuleSyncWatchdogSpec extends Specification {
     def 'Module sync cm handles starts with no available threads.'() {
         given: 'module sync utilities returns a advise cm handles'
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
-        and: 'the work queue is not locked'
-            mockWorkQueueLock.tryLock() >> true
+        and: 'the work queue can be locked'
+            mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
         and: 'the executor first has no threads but has one thread on the second attempt'
             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
         when: ' module sync is started'
@@ -93,8 +95,8 @@ class ModuleSyncWatchdogSpec extends Specification {
     def 'Module sync advised cm handle already handled by other thread.'() {
         given: 'module sync utilities returns an advised cm handle'
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
-        and: 'the work queue is not locked'
-            mockWorkQueueLock.tryLock() >> true
+        and: 'the work queue can be locked'
+            mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
         and: 'the executor has a thread available'
             spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
         and: 'the semaphore cache indicates the cm handle is already being processed'
@@ -131,16 +133,18 @@ class ModuleSyncWatchdogSpec extends Specification {
     def 'Module Sync Locking.'() {
         given: 'module sync utilities returns an advised cm handle'
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
-        and: 'can lock is : #canLock'
-            mockWorkQueueLock.tryLock() >> canLock
+        and: 'can be locked is : #canLock'
+            mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock
         when: 'attempt to populate the work queue'
             objectUnderTest.populateWorkQueueIfNeeded()
         then: 'the queue remains empty is #expectQueueRemainsEmpty'
             assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
+        and: 'unlock is called only when thread is able to enter the critical section'
+            expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock')
         where: 'the following lock states are applied'
-            canLock | expectQueueRemainsEmpty
-            false   | true
-            true    | false
+            canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock
+            false   || true                    || 0
+            true    || false                   || 1
     }
 
     def 'Sleeper gets interrupted.'() {
index 9fc3633..16b4460 100644 (file)
@@ -22,6 +22,7 @@
 package org.onap.cps.integration.base
 
 import com.hazelcast.collection.ISet
+import com.hazelcast.map.IMap
 import okhttp3.mockwebserver.MockWebServer
 import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDataService
@@ -122,6 +123,9 @@ abstract class CpsIntegrationSpecBase extends Specification {
     @Autowired
     BlockingQueue<String> moduleSyncWorkQueue
 
+    @Autowired
+    IMap<String, String> cpsAndNcmpLock
+
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
index 43bcbdb..a6e56ab 100644 (file)
@@ -151,6 +151,15 @@ class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
         }
     }
 
+    def populateQueueWithoutDelayCallable = () -> {
+        try {
+            objectUnderTest.populateWorkQueueIfNeeded()
+            return 'task acquired the lock first'
+        } catch (InterruptedException e) {
+            e.printStackTrace()
+        }
+    }
+
     def populateQueueWithDelay = () -> {
         try {
             Thread.sleep(10)