--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cache;
+
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.map.IMap;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class CpsAndNcmpLockConfig extends HazelcastCacheConfig {
+
+ // Lock names for different use cases ( to be used as cpsAndNcmpLock keys)
+ public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock";
+
+ private static final MapConfig cpsAndNcmpLockMapConfig = createMapConfig("cpsAndNcmpLockConfig");
+
+ /**
+ * Distributed instance used for locking purpose for various use cases in cps-and-ncmp.
+ * The key of the map entry is name of the lock and should be based on the use case we are locking.
+ *
+ * @return configured map of lock object to have distributed coordination.
+ */
+ @Bean
+ public IMap<String, String> cpsAndNcmpLock() {
+ return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock");
+ }
+
+
+}
package org.onap.cps.ncmp.impl.inventory.sync;
+import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME;
+
import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
- private final Lock workQueueLock;
+ private final IMap<String, String> cpsAndNcmpLock;
private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
* So it can be tested without the queue being emptied immediately as the main public method does.
*/
public void populateWorkQueueIfNeeded() {
- if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) {
+ if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) {
+ log.info("Lock acquired by thread : {}", Thread.currentThread().getName());
try {
populateWorkQueue();
if (moduleSyncWorkQueue.isEmpty()) {
setPreviouslyLockedCmHandlesToAdvised();
}
} finally {
- workQueueLock.unlock();
+ cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME);
+ log.info("Lock released by thread : {}", Thread.currentThread().getName());
}
}
}
import com.hazelcast.config.SetConfig;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
private static final SetConfig moduleSetTagsBeingProcessedConfig
= createSetConfig("moduleSetTagsBeingProcessedConfig");
- private static final String LOCK_NAME_FOR_WORK_QUEUE = "workQueueLock";
/**
* Module Sync Distributed Queue Instance.
public ISet<String> moduleSetTagsBeingProcessed() {
return getOrCreateHazelcastInstance(moduleSetTagsBeingProcessedConfig).getSet("moduleSetTagsBeingProcessed");
}
-
- /**
- * Retrieves a distributed lock used to control access to the work queue for module synchronization.
- * This lock ensures that the population and modification of the work queue are thread-safe and
- * protected from concurrent access across different nodes in the distributed system.
- * The lock guarantees that only one instance of the application can populate or modify the
- * module sync work queue at a time, preventing race conditions and potential data inconsistencies.
- * The lock is obtained using the Hazelcast CP Subsystem's {@link Lock}, which provides
- * strong consistency guarantees for distributed operations.
- *
- * @return a {@link Lock} instance used for synchronizing access to the work queue.
- */
- @Bean
- public Lock workQueueLock() {
- // TODO Method below does not use commonQueueConfig for creating lock (Refactor later)
- return getOrCreateHazelcastInstance(commonQueueConfig).getCPSubsystem().getLock(LOCK_NAME_FOR_WORK_QUEUE);
- }
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
- def mockWorkQueueLock = Mock(Lock)
+ def mockCpsAndNcmpLock = Mock(IMap<String,String>)
def spiedSleeper = Spy(Sleeper)
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper)
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockCpsAndNcmpLock, spiedSleeper)
void setup() {
spiedAsyncTaskExecutor.setupThreadPool()
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles)
and: 'module sync utilities returns no failed (locked) cm handles'
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
- and: 'the work queue is not locked'
- mockWorkQueueLock.tryLock() >> true
+ and: 'the work queue can be locked'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
and: 'the executor has enough available threads'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+ and: 'the executing thread is unlocked'
+ 1 * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following parameter are used'
scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
'none at all' | 0 || 0
def 'Module sync cm handles starts with no available threads.'() {
given: 'module sync utilities returns a advise cm handles'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
- and: 'the work queue is not locked'
- mockWorkQueueLock.tryLock() >> true
+ and: 'the work queue can be locked'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
and: 'the executor first has no threads but has one thread on the second attempt'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
def 'Module sync advised cm handle already handled by other thread.'() {
given: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
- and: 'the work queue is not locked'
- mockWorkQueueLock.tryLock() >> true
+ and: 'the work queue can be locked'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
and: 'the executor has a thread available'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
def 'Module Sync Locking.'() {
given: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
- and: 'can lock is : #canLock'
- mockWorkQueueLock.tryLock() >> canLock
+ and: 'can be locked is : #canLock'
+ mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock
when: 'attempt to populate the work queue'
objectUnderTest.populateWorkQueueIfNeeded()
then: 'the queue remains empty is #expectQueueRemainsEmpty'
assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
+ and: 'unlock is called only when thread is able to enter the critical section'
+ expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock')
where: 'the following lock states are applied'
- canLock | expectQueueRemainsEmpty
- false | true
- true | false
+ canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock
+ false || true || 0
+ true || false || 1
}
def 'Sleeper gets interrupted.'() {
package org.onap.cps.integration.base
import com.hazelcast.collection.ISet
+import com.hazelcast.map.IMap
import okhttp3.mockwebserver.MockWebServer
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.api.CpsDataService
@Autowired
BlockingQueue<String> moduleSyncWorkQueue
+ @Autowired
+ IMap<String, String> cpsAndNcmpLock
+
@Autowired
JsonObjectMapper jsonObjectMapper
}
}
+ def populateQueueWithoutDelayCallable = () -> {
+ try {
+ objectUnderTest.populateWorkQueueIfNeeded()
+ return 'task acquired the lock first'
+ } catch (InterruptedException e) {
+ e.printStackTrace()
+ }
+ }
+
def populateQueueWithDelay = () -> {
try {
Thread.sleep(10)