Introduce Model Loader Locking Mechanism 50/142250/10
authorToineSiebelink <toine.siebelink@est.tech>
Tue, 28 Oct 2025 13:36:11 +0000 (13:36 +0000)
committerToineSiebelink <toine.siebelink@est.tech>
Tue, 28 Oct 2025 14:10:31 +0000 (14:10 +0000)
- Moved core hazelcast config class and parameters from ncmp to cps service
- Renamed CpsAndNcmp lock classes and config to 'CpsCommonLocks' for common use
- Moved (duplicated) test to correct test base class
- Introducing Model Loader Coordination classes
- Update lombok config in cps-service to include qualifier annotations
- Update model loaders to only execute when master

Issue-ID:CPS-2989
Change-Id: I8d846e589e362d168bd95c454cee3b2c195a1ec7
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
33 files changed:
cps-ncmp-service/pom.xml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/AdminCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/AlternateIdCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cache/CmSubscriptionConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/AlternateIdChecker.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoader.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/InventoryModelLoader.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/SynchronizationCacheConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/trustlevel/TrustLevelCacheConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/init/CmDataSubscriptionModelLoaderSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/init/InventoryModelLoaderSpec.groovy
cps-service/lombok.config
cps-service/pom.xml
cps-service/src/main/java/org/onap/cps/impl/cache/CpsCommonLocksConfig.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/CpsAndNcmpLockConfig.java with 67% similarity]
cps-service/src/main/java/org/onap/cps/impl/cache/HazelcastCacheConfig.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfig.java with 99% similarity]
cps-service/src/main/java/org/onap/cps/init/AbstractModelLoader.java
cps-service/src/main/java/org/onap/cps/init/CpsNotificationSubscriptionModelLoader.java
cps-service/src/main/java/org/onap/cps/init/ModelLoaderCoordinatorEnd.java [new file with mode: 0644]
cps-service/src/main/java/org/onap/cps/init/ModelLoaderCoordinatorLock.java [new file with mode: 0644]
cps-service/src/main/java/org/onap/cps/utils/Sleeper.java [new file with mode: 0644]
cps-service/src/test/groovy/org/onap/cps/impl/cache/CpsCommonLocksConfigSpec.groovy [new file with mode: 0644]
cps-service/src/test/groovy/org/onap/cps/impl/cache/HazelcastCacheConfigSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cache/HazelcastCacheConfigSpec.groovy with 88% similarity]
cps-service/src/test/groovy/org/onap/cps/init/AbstractModelLoaderSpec.groovy
cps-service/src/test/groovy/org/onap/cps/init/CpsNotificationSubscriptionModelLoaderSpec.groovy
cps-service/src/test/groovy/org/onap/cps/init/ModelLoaderCoordinatorEndSpec.groovy [new file with mode: 0644]
cps-service/src/test/groovy/org/onap/cps/init/ModelLoaderCoordinatorLockSpec.groovy [new file with mode: 0644]
cps-service/src/test/resources/application.yml
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy

index 607f2cd..756b997 100644 (file)
             <groupId>${project.groupId}</groupId>
             <artifactId>cps-path-parser</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast-spring</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.mapstruct</groupId>
             <artifactId>mapstruct</artifactId>
index 05f96c8..c4716a4 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.impl.cache;
 
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.map.IMap;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -37,7 +38,6 @@ public class AdminCacheConfig extends HazelcastCacheConfig {
      */
     @Bean
     public IMap<String, Integer> cmHandlesByState() {
-        return getOrCreateHazelcastInstance(cmHandleStateCacheMapConfig).getMap(
-                "cmHandlesByState");
+        return getOrCreateHazelcastInstance(cmHandleStateCacheMapConfig).getMap("cmHandlesByState");
     }
 }
index b31bceb..4060432 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.impl.cache;
 
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.map.IMap;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -36,7 +37,7 @@ public class AlternateIdCacheConfig extends HazelcastCacheConfig {
      *
      * @return configured map of cm handle id by alternate id
      */
-    @Bean
+    @Bean("cmHandleIdPerAlternateId")
     public IMap<String, String> cmHandleIdPerAlternateId() {
         return getOrCreateHazelcastInstance(cmHandleIdPerAlternateIdMapConfig).getMap("cmHandleIdPerAlternateId");
     }
index 95ae011..6294c62 100644 (file)
@@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.cache;
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.map.IMap;
 import java.util.Map;
-import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
 import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
index aa7e261..2a313da 100644 (file)
@@ -44,6 +44,7 @@ public class AlternateIdChecker {
     }
 
     private final InventoryPersistence inventoryPersistence;
+
     @Qualifier("cmHandleIdPerAlternateId")
     private final IMap<String, String> cmHandleIdPerAlternateId;
 
index e56e6c2..59689ba 100644 (file)
@@ -81,6 +81,7 @@ public class CmHandleRegistrationService {
     private final IMap<String, Object> moduleSyncStartedOnCmHandles;
     private final TrustLevelManager trustLevelManager;
     private final AlternateIdChecker alternateIdChecker;
+
     @Qualifier("cmHandleIdPerAlternateId")
     private final IMap<String, String> cmHandleIdPerAlternateId;
 
index 49261f6..1c09309 100644 (file)
@@ -21,8 +21,6 @@
 
 package org.onap.cps.ncmp.impl.inventory.sync;
 
-import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME;
-
 import com.hazelcast.map.IMap;
 import java.util.Collection;
 import java.util.HashSet;
@@ -44,13 +42,12 @@ public class ModuleSyncWatchdog {
     private final BlockingQueue<String> moduleSyncWorkQueue;
     private final IMap<String, Object> moduleSyncStartedOnCmHandles;
     private final ModuleSyncTasks moduleSyncTasks;
-    @Qualifier("cpsAndNcmpLock")
-    private final IMap<String, String> cpsAndNcmpLock;
+    @Qualifier("cpsCommonLocks") private final IMap<String, String> cpsCommonLocks;
     private final ReadinessManager readinessManager;
 
     private static final int MODULE_SYNC_BATCH_SIZE = 300;
     private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
-
+    private static final String MODULE_SYNC_WORK_QUEUE_COMMON_LOCK_NAME = "workQueueLock";
 
     /**
      * Check DB for any cm handles in 'ADVISED' state.
@@ -86,12 +83,12 @@ public class ModuleSyncWatchdog {
     }
 
     /**
-     * Populate work queue with advised cm handles from db.
+     * Populate module sync work queue with advised cm handles from db.
      * This method is made public for (integration) testing purposes.
      * So it can be tested without the queue being emptied immediately as the main public method does.
      */
     public void populateWorkQueueIfNeeded() {
-        if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) {
+        if (moduleSyncWorkQueue.isEmpty() && cpsCommonLocks.tryLock(MODULE_SYNC_WORK_QUEUE_COMMON_LOCK_NAME)) {
             log.debug("Lock acquired by thread : {}", Thread.currentThread().getName());
             try {
                 populateWorkQueue();
@@ -99,7 +96,7 @@ public class ModuleSyncWatchdog {
                     setPreviouslyLockedCmHandlesToAdvised();
                 }
             } finally {
-                cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME);
+                cpsCommonLocks.unlock(MODULE_SYNC_WORK_QUEUE_COMMON_LOCK_NAME);
                 log.debug("Lock released by thread : {}", Thread.currentThread().getName());
             }
         }
index 23fa917..6981b07 100644 (file)
@@ -24,7 +24,7 @@ import com.hazelcast.config.MapConfig;
 import com.hazelcast.config.QueueConfig;
 import com.hazelcast.map.IMap;
 import java.util.concurrent.BlockingQueue;
-import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
index 3fc4b05..8a20f47 100644 (file)
@@ -23,8 +23,8 @@ package org.onap.cps.ncmp.impl.inventory.trustlevel;
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.config.NearCacheConfig;
 import com.hazelcast.map.IMap;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
 import org.onap.cps.ncmp.api.inventory.models.TrustLevel;
-import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
index 99a2829..893d94e 100644 (file)
@@ -28,6 +28,7 @@ import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsDataspaceService;
 import org.onap.cps.api.CpsModuleService;
 import org.onap.cps.init.AbstractModelLoader;
+import org.onap.cps.init.ModelLoaderCoordinatorLock;
 import org.onap.cps.init.actuator.ReadinessManager;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Service;
@@ -42,19 +43,25 @@ public class CmDataSubscriptionModelLoader extends AbstractModelLoader {
     private static final String ANCHOR_NAME = "cm-data-job-subscriptions";
     private static final String REGISTRY_DATA_NODE_NAME = "dataJob";
 
-    public CmDataSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService,
+    public CmDataSubscriptionModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+                                         final CpsDataspaceService cpsDataspaceService,
                                          final CpsModuleService cpsModuleService,
                                          final CpsAnchorService cpsAnchorService,
                                          final CpsDataService cpsDataService,
                                          final ReadinessManager readinessManager) {
-        super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager);
+        super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+            readinessManager);
     }
 
     @Override
     public void onboardOrUpgradeModel() {
-        log.info("Model Loader #3 Started: NCMP CM Data Notification Subscription Models");
-        onboardSubscriptionModels();
-        log.info("Model Loader #3 Completed");
+        if (isMaster) {
+            log.info("Model Loader #3 Started: NCMP CM Data Notification Subscription Models");
+            onboardSubscriptionModels();
+            log.info("Model Loader #3 Completed");
+        } else {
+            logMessageForNonMasterInstance();
+        }
     }
 
     private void onboardSubscriptionModels() {
index f1d89a0..b64612b 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsDataspaceService;
 import org.onap.cps.api.CpsModuleService;
 import org.onap.cps.init.AbstractModelLoader;
+import org.onap.cps.init.ModelLoaderCoordinatorLock;
 import org.onap.cps.init.actuator.ReadinessManager;
 import org.onap.cps.ncmp.utils.events.NcmpInventoryModelOnboardingFinishedEvent;
 import org.springframework.beans.factory.annotation.Value;
@@ -55,32 +56,39 @@ public class InventoryModelLoader extends AbstractModelLoader {
      * Creates a new {@code InventoryModelLoader} instance responsible for onboarding or upgrading
      * the NCMP inventory model schema sets and managing readiness state during migration.
      */
-    public InventoryModelLoader(final CpsDataspaceService cpsDataspaceService,
+    public InventoryModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+                                final CpsDataspaceService cpsDataspaceService,
                                 final CpsModuleService cpsModuleService,
                                 final CpsAnchorService cpsAnchorService,
                                 final CpsDataService cpsDataService,
                                 final ApplicationEventPublisher applicationEventPublisher,
                                 final ReadinessManager readinessManager) {
-        super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager);
+        super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+            readinessManager);
         this.applicationEventPublisher = applicationEventPublisher;
     }
 
     @Override
     public void onboardOrUpgradeModel() {
-        log.info("Model Loader #2 Started: NCMP Inventory Models");
-        final String schemaToInstall = newRevisionEnabled ? NEW_INVENTORY_SCHEMA_SET_NAME : PREVIOUS_SCHEMA_SET_NAME;
-        final String moduleRevision = getModuleRevision(schemaToInstall);
+        if (isMaster) {
+            log.info("Model Loader #2 Started: NCMP Inventory Models");
+            final String schemaToInstall =
+                newRevisionEnabled ? NEW_INVENTORY_SCHEMA_SET_NAME : PREVIOUS_SCHEMA_SET_NAME;
+            final String moduleRevision = getModuleRevision(schemaToInstall);
 
-        if (isModuleRevisionInstalled(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, INVENTORY_YANG_MODULE_NAME,
+            if (isModuleRevisionInstalled(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, INVENTORY_YANG_MODULE_NAME,
                 moduleRevision)) {
-            log.info("Revision {} is already installed.", moduleRevision);
-        } else if (newRevisionEnabled && doesAnchorExist(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR)) {
-            upgradeAndMigrateInventoryModel();
+                log.info("Revision {} is already installed.", moduleRevision);
+            } else if (newRevisionEnabled && doesAnchorExist(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR)) {
+                upgradeAndMigrateInventoryModel();
+            } else {
+                installInventoryModel(schemaToInstall);
+            }
+            applicationEventPublisher.publishEvent(new NcmpInventoryModelOnboardingFinishedEvent(this));
+            log.info("Model Loader #2 Completed");
         } else {
-            installInventoryModel(schemaToInstall);
+            logMessageForNonMasterInstance();
         }
-        applicationEventPublisher.publishEvent(new NcmpInventoryModelOnboardingFinishedEvent(this));
-        log.info("Model Loader #2 Completed");
     }
 
     private void installInventoryModel(final String schemaSetName) {
index dd1c1fb..a26ba11 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2025 Nordix Foundation
+ *  Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,11 +27,11 @@ import ch.qos.logback.classic.spi.ILoggingEvent
 import ch.qos.logback.core.read.ListAppender
 import com.hazelcast.map.IMap
 import org.onap.cps.init.actuator.ReadinessManager
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
 import org.slf4j.LoggerFactory
+import spock.lang.Specification
 
 import java.util.concurrent.ArrayBlockingQueue
-import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import spock.lang.Specification
 
 class ModuleSyncWatchdogSpec extends Specification {
 
@@ -45,11 +45,11 @@ class ModuleSyncWatchdogSpec extends Specification {
 
     def mockModuleSyncTasks = Mock(ModuleSyncTasks)
 
-    def mockCpsAndNcmpLock = Mock(IMap<String,String>)
+    def mockCpsCommonLocks = Mock(IMap<String,String>)
 
     def mockReadinessManager = Mock(ReadinessManager)
 
-    def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock, mockReadinessManager)
+    def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsCommonLocks, mockReadinessManager)
 
     def logAppender = Spy(ListAppender<ILoggingEvent>)
 
@@ -84,13 +84,13 @@ class ModuleSyncWatchdogSpec extends Specification {
         and: 'module sync utilities returns no failed (locked) cm handles'
             mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
         and: 'the work queue can be locked'
-            mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
+            mockCpsCommonLocks.tryLock('workQueueLock') >> true
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs #expectedNumberOfTaskExecutions tasks'
             expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(*_)
         and: 'the executing thread is unlocked'
-            1 * mockCpsAndNcmpLock.unlock('workQueueLock')
+            1 * mockCpsCommonLocks.unlock('workQueueLock')
         where: 'the following parameter are used'
             scenario              | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
             'none at all'         | 0                                                                 || 0
@@ -107,7 +107,7 @@ class ModuleSyncWatchdogSpec extends Specification {
         and: 'module sync utilities returns a advise cm handles'
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'the work queue can be locked'
-            mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
+            mockCpsCommonLocks.tryLock('workQueueLock') >> true
         when: ' module sync is started'
             objectUnderTest.moduleSyncAdvisedCmHandles()
         then: 'it performs one task'
@@ -120,7 +120,7 @@ class ModuleSyncWatchdogSpec extends Specification {
         and: 'module sync utilities returns an advised cm handle'
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'the work queue can be locked'
-            mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
+            mockCpsCommonLocks.tryLock('workQueueLock') >> true
         and: 'the semaphore cache indicates the cm handle is already being processed'
             mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
         when: 'module sync is started'
@@ -158,13 +158,13 @@ class ModuleSyncWatchdogSpec extends Specification {
         and: 'module sync utilities returns an advised cm handle'
             mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
         and: 'can be locked is : #canLock'
-            mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock
+            mockCpsCommonLocks.tryLock('workQueueLock') >> canLock
         when: 'attempt to populate the work queue'
             objectUnderTest.populateWorkQueueIfNeeded()
         then: 'the queue remains empty is #expectQueueRemainsEmpty'
             assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
         and: 'unlock is called only when thread is able to enter the critical section'
-            expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock')
+            expectedInvocationToUnlock * mockCpsCommonLocks.unlock('workQueueLock')
         where: 'the following lock states are applied'
             canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock
             false   || true                    || 0
index 1cecbbe..538cdaa 100644 (file)
 
 package org.onap.cps.ncmp.impl.inventory.sync
 
-import com.hazelcast.config.Config
+
 import com.hazelcast.core.Hazelcast
 import com.hazelcast.map.IMap
-import java.util.concurrent.BlockingQueue
-import java.util.concurrent.TimeUnit
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.context.ContextConfiguration
 import spock.lang.Specification
 import spock.util.concurrent.PollingConditions
 
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.TimeUnit
+
 @SpringBootTest
 @ContextConfiguration(classes = [SynchronizationCacheConfig])
 class SynchronizationCacheConfigSpec extends Specification {
@@ -90,20 +91,6 @@ class SynchronizationCacheConfigSpec extends Specification {
             assert !hzConfig.join.kubernetesConfig.enabled
     }
 
-    def 'Verify network config'() {
-        given: 'Synchronization config object and test configuration'
-            def objectUnderTest = new SynchronizationCacheConfig()
-            def testConfig = new Config()
-        when: 'kubernetes properties are enabled'
-            objectUnderTest.cacheKubernetesEnabled = true
-            objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
-        and: 'method called to update the discovery mode'
-            objectUnderTest.updateDiscoveryMode(testConfig)
-        then: 'applied properties are reflected'
-            assert testConfig.networkConfig.join.kubernetesConfig.enabled
-            assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
-    }
-
     def 'Time to Live Verify for Module Sync Semaphore'() {
         when: 'the key is inserted with a TTL'
             moduleSyncStartedOnCmHandles.put('testKeyModuleSync', 'toBeExpired' as Object, 100, TimeUnit.MILLISECONDS)
index 20b1c1c..aab2f3c 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
 
 package org.onap.cps.ncmp.impl.inventory.trustlevel
 
-import com.hazelcast.config.Config
 import com.hazelcast.core.Hazelcast
 import com.hazelcast.map.IMap
 import org.onap.cps.ncmp.api.inventory.models.TrustLevel
@@ -94,18 +93,4 @@ class TrustLevelCacheConfigSpec extends Specification {
             assert !trustLevelPerCmHandlePluginCacheConfig.join.kubernetesConfig.enabled
     }
 
-    def 'Verify network config'() {
-        given: 'Synchronization config object and test configuration'
-            def objectUnderTest = new TrustLevelCacheConfig()
-            def testConfig = new Config()
-        when: 'kubernetes properties are enabled'
-            objectUnderTest.cacheKubernetesEnabled = true
-            objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
-        and: 'method called to update the discovery mode'
-            objectUnderTest.updateDiscoveryMode(testConfig)
-        then: 'applied properties are reflected'
-            assert testConfig.networkConfig.join.kubernetesConfig.enabled
-            assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
-    }
-
 }
index 4d005be..425dfb8 100644 (file)
@@ -28,10 +28,9 @@ import org.onap.cps.api.CpsDataService
 import org.onap.cps.api.CpsDataspaceService
 import org.onap.cps.api.CpsModuleService
 import org.onap.cps.api.model.Dataspace
+import org.onap.cps.init.ModelLoaderCoordinatorLock
 import org.onap.cps.init.actuator.ReadinessManager
 import org.slf4j.LoggerFactory
-import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
 import org.springframework.context.annotation.AnnotationConfigApplicationContext
 import spock.lang.Specification
 
@@ -39,12 +38,13 @@ import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NA
 
 class CmDataSubscriptionModelLoaderSpec extends Specification {
 
+    def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
     def mockCpsDataspaceService = Mock(CpsDataspaceService)
     def mockCpsModuleService = Mock(CpsModuleService)
     def mockCpsDataService = Mock(CpsDataService)
     def mockCpsAnchorService = Mock(CpsAnchorService)
     def mockReadinessManager = Mock(ReadinessManager)
-    def objectUnderTest = new CmDataSubscriptionModelLoader(mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
+    def objectUnderTest = new CmDataSubscriptionModelLoader(mockModelLoaderCoordinatorLock, mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
 
     def applicationContext = new AnnotationConfigApplicationContext()
 
@@ -53,6 +53,7 @@ class CmDataSubscriptionModelLoaderSpec extends Specification {
     def loggingListAppender
 
     void setup() {
+        objectUnderTest.isMaster = true
         expectedYangResourcesToContentMap = objectUnderTest.mapYangResourcesToContent('cm-data-job-subscriptions@2025-09-03.yang')
         logger.setLevel(Level.DEBUG)
         loggingListAppender = new ListAppender()
@@ -69,8 +70,8 @@ class CmDataSubscriptionModelLoaderSpec extends Specification {
     def 'Onboard subscription model via application ready event.'() {
         given: 'dataspace is ready for use'
             mockCpsDataspaceService.getDataspace(NCMP_DATASPACE_NAME) >> new Dataspace('')
-        when: 'the application is ready'
-            objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
+        when: 'the model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
         then: 'the module service to create schema set is called once'
             1 * mockCpsModuleService.createSchemaSet(NCMP_DATASPACE_NAME, 'cm-data-job-subscriptions', expectedYangResourcesToContentMap)
         and: 'the admin service to create an anchor set is called once'
@@ -79,4 +80,13 @@ class CmDataSubscriptionModelLoaderSpec extends Specification {
             1 * mockCpsDataService.saveData(NCMP_DATASPACE_NAME, 'cm-data-job-subscriptions', '{"dataJob":{}}', _)
     }
 
+    def 'Onboarding is skipped when instance is not master'() {
+        given: 'instance is not master'
+            objectUnderTest.isMaster = false
+        when: 'the model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
+        then: 'the onboard/upgrade methods are not executed'
+            0 * mockCpsModuleService.createSchemaSet(*_)
+    }
+
 }
index 5f42202..b000099 100644 (file)
@@ -30,10 +30,10 @@ import org.onap.cps.api.CpsModuleService
 import org.onap.cps.api.exceptions.AnchorNotFoundException
 import org.onap.cps.api.model.Dataspace
 import org.onap.cps.api.model.ModuleDefinition
+import org.onap.cps.init.ModelLoaderCoordinatorLock
 import org.onap.cps.init.actuator.ReadinessManager
 import org.slf4j.LoggerFactory
 import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
 import org.springframework.context.ApplicationEventPublisher
 import org.springframework.context.annotation.AnnotationConfigApplicationContext
 import spock.lang.Specification
@@ -43,13 +43,14 @@ import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY
 
 class InventoryModelLoaderSpec extends Specification {
 
+    def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
     def mockCpsAdminService = Mock(CpsDataspaceService)
     def mockCpsModuleService = Mock(CpsModuleService)
     def mockCpsDataService = Mock(CpsDataService)
     def mockCpsAnchorService = Mock(CpsAnchorService)
     def mockApplicationEventPublisher = Mock(ApplicationEventPublisher)
     def mockReadinessManager = Mock(ReadinessManager)
-    def objectUnderTest = new InventoryModelLoader(mockCpsAdminService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockApplicationEventPublisher, mockReadinessManager)
+    def objectUnderTest = new InventoryModelLoader(mockModelLoaderCoordinatorLock, mockCpsAdminService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockApplicationEventPublisher, mockReadinessManager)
 
     def applicationContext = new AnnotationConfigApplicationContext()
 
@@ -59,6 +60,7 @@ class InventoryModelLoaderSpec extends Specification {
     def loggingListAppender
 
     void setup() {
+        objectUnderTest.isMaster = true
         expectedPreviousYangResourceToContentMap = objectUnderTest.mapYangResourcesToContent('dmi-registry@2024-02-23.yang')
         expectedNewYangResourceToContentMap = objectUnderTest.mapYangResourcesToContent('dmi-registry@2025-07-22.yang')
         objectUnderTest.newRevisionEnabled = true
@@ -114,15 +116,27 @@ class InventoryModelLoaderSpec extends Specification {
             assert loggingListAppender.list.any { it.message.contains("Inventory upgraded successfully") }
     }
 
+    def 'Onboarding is skipped when instance is not master'() {
+        given: 'instance is not master'
+            objectUnderTest.isMaster = false
+        when: 'the model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
+        then: 'the onboard/upgrade methods are not executed'
+            0 * mockCpsModuleService.createSchemaSet(*_)
+    }
+
+
     def 'Skip upgrade model revision when new revision already installed'() {
         given: 'the anchor exists and the new model revision is already installed'
             mockCpsAnchorService.getAnchor(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR) >> {}
-            mockCpsModuleService.getModuleDefinitionsByAnchorAndModule(_, _, _, _) >> [new ModuleDefinition('', '', '')]
-        when: 'the inventory model loader is triggered'
+            mockCpsModuleService.getModuleDefinitionsByAnchorAndModule(*_) >> [new ModuleDefinition('', '', '')]
+        when: 'the model loader is started'
             objectUnderTest.onboardOrUpgradeModel()
         then: 'no new schema set is created'
-            0 * mockCpsModuleService.createSchemaSet(_, _, _)
+            0 * mockCpsModuleService.createSchemaSet(*_)
         and: 'a log message confirms the revision is already installed'
             assert loggingListAppender.list.any { it.message.contains("already installed") }
     }
+
+
 }
index 0736fc5..a1c8d12 100755 (executable)
@@ -1,5 +1,5 @@
 #  ============LICENSE_START=======================================================
-#  Copyright (C) 2021 Nordix Foundation
+#  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -18,3 +18,4 @@
 
 config.stopBubbling = true
 lombok.addLombokGeneratedAnnotation = true
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
index 98cbb8c..c7b780d 100644 (file)
       <groupId>org.onap.cps</groupId>
       <artifactId>cps-path-parser</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.hazelcast</groupId>
+      <artifactId>hazelcast-spring</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-model-api</artifactId>
@@ -18,7 +18,7 @@
  *  ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.ncmp.impl.cache;
+package org.onap.cps.impl.cache;
 
 import com.hazelcast.config.MapConfig;
 import com.hazelcast.map.IMap;
@@ -26,23 +26,19 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 @Configuration
-public class CpsAndNcmpLockConfig extends HazelcastCacheConfig {
+public class CpsCommonLocksConfig extends HazelcastCacheConfig {
 
-    // Lock names for different use cases ( to be used as cpsAndNcmpLock keys)
-    public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock";
-
-    private static final MapConfig cpsAndNcmpLockMapConfig = createGenericMapConfig("cpsAndNcmpLockConfig");
+    private static final MapConfig cpsCommonLocksMapConfig = createGenericMapConfig("cpsCommonLocksMapConfig");
 
     /**
-     * Distributed instance used for locking purpose for various use cases in cps-and-ncmp.
+     * Distributed instance used for locking purpose for various use cases in cps and dependant services.
      * The key of the map entry is name of the lock and should be based on the use case we are locking.
      *
-     * @return configured map of lock object to have distributed coordination.
+     * @return configured map of lock objects to have distributed coordination.
      */
-    @Bean
-    public IMap<String, String> cpsAndNcmpLock() {
-        return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock");
+    @Bean("cpsCommonLocks")
+    public IMap<String, String> cpsCommonLocks() {
+        return getOrCreateHazelcastInstance(cpsCommonLocksMapConfig).getMap("cpsCommonLocks");
     }
 
-
 }
@@ -18,7 +18,7 @@
  *  ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.ncmp.impl.cache;
+package org.onap.cps.impl.cache;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.config.MapConfig;
@@ -51,7 +51,6 @@ public class HazelcastCacheConfig {
 
     protected HazelcastInstance getOrCreateHazelcastInstance(final NamedConfig namedConfig) {
         return Hazelcast.getOrCreateHazelcastInstance(defineInstanceConfig(instanceConfigName, namedConfig));
-
     }
 
     private Config defineInstanceConfig(final String instanceConfigName, final NamedConfig namedConfig) {
@@ -89,8 +88,6 @@ public class HazelcastCacheConfig {
         return mapConfig;
     }
 
-
-
     protected static QueueConfig createQueueConfig(final String configName) {
         final QueueConfig commonQueueConfig = new QueueConfig(configName);
         commonQueueConfig.setBackupCount(1);
index 563c266..47f4701 100644 (file)
@@ -49,12 +49,15 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
 @RequiredArgsConstructor
 public abstract class AbstractModelLoader implements ModelLoader {
 
+    protected final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock;
     protected final CpsDataspaceService cpsDataspaceService;
     private final CpsModuleService cpsModuleService;
     protected final CpsAnchorService cpsAnchorService;
     protected final CpsDataService cpsDataService;
     protected final ReadinessManager readinessManager;
 
+    protected boolean isMaster = false;
+
     private final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper());
 
     private static final int EXIT_CODE_ON_ERROR = 1;
@@ -62,9 +65,10 @@ public abstract class AbstractModelLoader implements ModelLoader {
     @Override
     public void onApplicationEvent(final ApplicationReadyEvent applicationReadyEvent) {
         try {
+            checkIfThisInstanceIsMaster();
             onboardOrUpgradeModel();
         } catch (final Exception exception) {
-            log.error("Exiting application due to failure in onboarding model: {} ",
+            log.error("Exiting application due to failure in onboarding model: {}",
                     exception.getMessage());
             exitApplication(applicationReadyEvent);
         } finally {
@@ -206,6 +210,9 @@ public abstract class AbstractModelLoader implements ModelLoader {
         return !moduleDefinitions.isEmpty();
     }
 
+    public void logMessageForNonMasterInstance() {
+        log.info("This instance is not model loader master, skipping model loader for: {}", getName());
+    }
 
     Map<String, String> mapYangResourcesToContent(final String... resourceNames) {
         final Map<String, String> yangResourceContentByName = new HashMap<>();
@@ -215,6 +222,15 @@ public abstract class AbstractModelLoader implements ModelLoader {
         return yangResourceContentByName;
     }
 
+    void checkIfThisInstanceIsMaster() {
+        isMaster = isMaster || modelLoaderCoordinatorLock.tryLock();
+        if (isMaster) {
+            log.info("This instance is model loader master");
+        } else {
+            log.info("Another instance is model loader master");
+        }
+    }
+
     private String getFileContentAsString(final String fileName) {
         try (final InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName)) {
             return new String(inputStream != null ? inputStream.readAllBytes() : null, StandardCharsets.UTF_8);
index f8c8591..a9f2e82 100644 (file)
@@ -41,19 +41,25 @@ public class CpsNotificationSubscriptionModelLoader extends AbstractModelLoader
     private static final String CPS_DATASPACE_NAME = "CPS-Admin";
     private static final String REGISTRY_DATANODE_NAME = "dataspaces";
 
-    public CpsNotificationSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService,
+    public CpsNotificationSubscriptionModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+                                                  final CpsDataspaceService cpsDataspaceService,
                                                   final CpsModuleService cpsModuleService,
                                                   final CpsAnchorService cpsAnchorService,
                                                   final CpsDataService cpsDataService,
                                                   final ReadinessManager readinessManager) {
-        super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager);
+        super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+            readinessManager);
     }
 
     @Override
     public void onboardOrUpgradeModel() {
-        log.info("Model Loader #1 Started: CPS Data Notification Subscription Models");
-        onboardSubscriptionModels();
-        log.info("Model Loader #1 Completed");
+        if (isMaster) {
+            log.info("Model Loader #1 Started: CPS Data Notification Subscription Models");
+            onboardSubscriptionModels();
+            log.info("Model Loader #1 Completed");
+        } else {
+            logMessageForNonMasterInstance();
+        }
     }
 
     private void onboardSubscriptionModels() {
diff --git a/cps-service/src/main/java/org/onap/cps/init/ModelLoaderCoordinatorEnd.java b/cps-service/src/main/java/org/onap/cps/init/ModelLoaderCoordinatorEnd.java
new file mode 100644 (file)
index 0000000..baf3fa2
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init;
+
+import static org.springframework.core.Ordered.LOWEST_PRECEDENCE;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsAnchorService;
+import org.onap.cps.api.CpsDataService;
+import org.onap.cps.api.CpsDataspaceService;
+import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.init.actuator.ReadinessManager;
+import org.onap.cps.utils.Sleeper;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@Order(LOWEST_PRECEDENCE)
+public class ModelLoaderCoordinatorEnd extends AbstractModelLoader {
+
+    final Sleeper sleeper;
+
+    /**
+     * Constructor.
+     *
+     * @param modelLoaderCoordinatorLock the modelLoaderCoordinatorLock
+     * @param cpsDataspaceService the cpsDataspaceService
+     * @param cpsModuleService the cpsModuleService
+     * @param cpsAnchorService the cpsAnchorService
+     * @param cpsDataService the cpsDataService
+     * @param readinessManager the readinessManager
+     * @param sleeper the sleeper
+     */
+    public ModelLoaderCoordinatorEnd(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+                                     final CpsDataspaceService cpsDataspaceService,
+                                     final CpsModuleService cpsModuleService,
+                                     final CpsAnchorService cpsAnchorService,
+                                     final CpsDataService cpsDataService,
+                                     final ReadinessManager readinessManager,
+                                     final Sleeper sleeper) {
+        super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+            readinessManager);
+        this.sleeper = sleeper;
+    }
+
+    @Override
+    public void onboardOrUpgradeModel() {
+        log.info("Model Loader #999 Started: Coordinator End, check model loaders are completed");
+        if (isMaster) {
+            releaseLock();
+            log.info("This instance is model loader master. Model loading completed");
+        } else {
+            log.info("Wait for model loader master to finish");
+            waitForMasterToFinish();
+        }
+        log.info("Model Loader #999 Completed");
+    }
+
+    private void releaseLock() {
+        modelLoaderCoordinatorLock.forceUnlock();
+        log.info("Model loading (on master) finished");
+    }
+
+    private void waitForMasterToFinish() {
+        while (modelLoaderCoordinatorLock.isLocked()) {
+            log.info("Still waiting for model loader master to finish");
+            try {
+                sleeper.haveALittleRest(100);
+            } catch (final InterruptedException e) {
+                log.warn("I cannot sleep (ignored)");
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+}
diff --git a/cps-service/src/main/java/org/onap/cps/init/ModelLoaderCoordinatorLock.java b/cps-service/src/main/java/org/onap/cps/init/ModelLoaderCoordinatorLock.java
new file mode 100644 (file)
index 0000000..dfccc0d
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init;
+
+import com.hazelcast.map.IMap;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class ModelLoaderCoordinatorLock {
+
+    @Qualifier("cpsCommonLocks")
+    private final IMap<String, String> cpsCommonLocks;
+
+    private static final String MODULE_LOADER_LOCK_NAME = "modelLoaderLock";
+
+    boolean tryLock() {
+        return cpsCommonLocks.tryLock(MODULE_LOADER_LOCK_NAME);
+    }
+
+    boolean isLocked() {
+        return cpsCommonLocks.isLocked(MODULE_LOADER_LOCK_NAME);
+    }
+
+    void forceUnlock() {
+        cpsCommonLocks.forceUnlock(MODULE_LOADER_LOCK_NAME);
+    }
+
+}
diff --git a/cps-service/src/main/java/org/onap/cps/utils/Sleeper.java b/cps-service/src/main/java/org/onap/cps/utils/Sleeper.java
new file mode 100644 (file)
index 0000000..682c034
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.utils;
+
+import java.util.concurrent.TimeUnit;
+import org.springframework.stereotype.Service;
+
+/**
+ * This class is to extract out sleep functionality so the interrupted exception handling can
+ * be covered with a test (e.g. using spy on Sleeper) and help to get too 100% code coverage.
+ */
+@Service
+public class Sleeper {
+    public void haveALittleRest(final int timeInMilliSeconds) throws InterruptedException {
+        TimeUnit.MILLISECONDS.sleep(timeInMilliSeconds);
+    }
+}
diff --git a/cps-service/src/test/groovy/org/onap/cps/impl/cache/CpsCommonLocksConfigSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/impl/cache/CpsCommonLocksConfigSpec.groovy
new file mode 100644 (file)
index 0000000..f637116
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.impl.cache
+
+import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.map.IMap
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+@SpringBootTest
+@ContextConfiguration(classes = [CpsCommonLocksConfig])
+class CpsCommonLocksConfigSpec extends Specification {
+
+    @Autowired
+    IMap<String, String> cpsCommonLocks
+
+    def cleanupSpec() {
+        Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown()
+    }
+
+    def 'Hazelcast common locks'() {
+        expect: 'system is able to create an instance of the common locks cache'
+            assert null != cpsCommonLocks
+        and: 'there is at least 1 instance'
+            assert Hazelcast.allHazelcastInstances.size() > 0
+        and: 'Hazelcast cache instance for common locks is present'
+            assert Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').getMap('cpsCommonLocks') != null
+    }
+
+    def 'Verify network config'() {
+        given: 'Synchronization config object and test configuration'
+            def objectUnderTest = new CpsCommonLocksConfig()
+            def testConfig = new Config()
+        when: 'kubernetes properties are enabled'
+            objectUnderTest.cacheKubernetesEnabled = true
+            objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
+        and: 'method called to update the discovery mode'
+            objectUnderTest.updateDiscoveryMode(testConfig)
+        then: 'applied properties are reflected'
+            assert testConfig.networkConfig.join.kubernetesConfig.enabled
+            assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
+    }
+}
@@ -18,7 +18,7 @@
  *  ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.ncmp.impl.cache
+package org.onap.cps.impl.cache
 
 import com.hazelcast.core.Hazelcast
 import spock.lang.Specification
@@ -52,10 +52,10 @@ class HazelcastCacheConfigSpec extends Specification {
                 assert result.config.setConfigs.values()[0].name == 'my set config'
             }
         where: 'the following configs are used'
-            scenario       | config                                                    || expectMapConfig | expectQueueConfig | expectSetConfig
-            'Map Config'   | HazelcastCacheConfig.createGenericMapConfig('my map config') || true | false | false
-            'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false           | true              | false
-            'Set Config'   | HazelcastCacheConfig.createSetConfig('my set config')     || false           | false             | true
+            scenario       | config                                                       || expectMapConfig | expectQueueConfig | expectSetConfig
+            'Map Config'   | HazelcastCacheConfig.createGenericMapConfig('my map config') || true            | false             | false
+            'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config')    || false           | true              | false
+            'Set Config'   | HazelcastCacheConfig.createSetConfig('my set config')        || false           | false             | true
     }
 
 }
index 097d79f..1620b1a 100644 (file)
@@ -28,35 +28,35 @@ import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDataService
 import org.onap.cps.api.CpsDataspaceService
 import org.onap.cps.api.CpsModuleService
+import org.onap.cps.api.exceptions.AlreadyDefinedException
 import org.onap.cps.api.exceptions.AnchorNotFoundException
 import org.onap.cps.api.exceptions.DuplicatedYangResourceException
 import org.onap.cps.api.exceptions.ModelOnboardingException
 import org.onap.cps.api.model.ModuleDefinition
 import org.onap.cps.api.parameters.CascadeDeleteAllowed
-import org.onap.cps.api.exceptions.AlreadyDefinedException
 import org.onap.cps.init.actuator.ReadinessManager
 import org.slf4j.LoggerFactory
 import org.springframework.boot.SpringApplication
 import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
 import org.springframework.context.annotation.AnnotationConfigApplicationContext
 import spock.lang.Specification
 
 class AbstractModelLoaderSpec extends Specification {
 
+    def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
     def mockCpsDataspaceService = Mock(CpsDataspaceService)
     def mockCpsModuleService = Mock(CpsModuleService)
     def mockCpsDataService = Mock(CpsDataService)
     def mockCpsAnchorService = Mock(CpsAnchorService)
     def mockReadinessManager = Mock(ReadinessManager)
-    def objectUnderTest = Spy(new TestModelLoader(mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager))
+    def objectUnderTest = Spy(new TestModelLoader(mockModelLoaderCoordinatorLock, mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager))
 
     def applicationContext = new AnnotationConfigApplicationContext()
 
     def logger = (Logger) LoggerFactory.getLogger(AbstractModelLoader)
     def loggingListAppender
 
-    void setup() {
+    def setup() {
         logger.setLevel(Level.DEBUG)
         loggingListAppender = new ListAppender()
         logger.addAppender(loggingListAppender)
@@ -64,7 +64,7 @@ class AbstractModelLoaderSpec extends Specification {
         applicationContext.refresh()
     }
 
-    void cleanup() {
+    def cleanup() {
         logger.detachAndStopAllAppenders()
         applicationContext.close()
         loggingListAppender.stop()
@@ -102,7 +102,7 @@ class AbstractModelLoaderSpec extends Specification {
         then: 'the exception is ignored i.e. no exception thrown up'
             noExceptionThrown()
         and: 'the exception is ignored, and a log message is produced'
-            assertLogContains('Dataspace already exists')
+            assert logContains('Dataspace already exists')
     }
 
     def 'Creating a dataspace handles other exception.'() {
@@ -130,7 +130,7 @@ class AbstractModelLoaderSpec extends Specification {
             objectUnderTest.createSchemaSet('some dataspace','some schema set','cps-notification-subscriptions@2024-07-03.yang')
         then: 'exception is ignored, and correct exception message is logged'
             noExceptionThrown()
-            assertLogContains('Ignoring yang resource duplication exception. Assuming model was created by another instance')
+            assert logContains('Ignoring yang resource duplication exception. Assuming model was created by another instance')
     }
 
     def 'Creating a schema set handles already defined exception.'() {
@@ -140,7 +140,7 @@ class AbstractModelLoaderSpec extends Specification {
             objectUnderTest.createSchemaSet('some dataspace','new name','cps-notification-subscriptions@2024-07-03.yang')
         then: 'the exception is ignored, and a log message is produced'
             noExceptionThrown()
-            assertLogContains('Creating new schema set failed as schema set already exists')
+            assert logContains('Creating new schema set failed as schema set already exists')
     }
 
     def 'Creating a schema set from a non-existing YANG file.'() {
@@ -166,7 +166,7 @@ class AbstractModelLoaderSpec extends Specification {
             objectUnderTest.createAnchor('some dataspace','some schema set','new name')
         then: 'the exception is ignored, and a log message is produced'
             noExceptionThrown()
-            assertLogContains('Creating new anchor failed as anchor already exists')
+            assert logContains('Creating new anchor failed as anchor already exists')
     }
 
     def 'Creating an anchor handles other exceptions.'() {
@@ -194,7 +194,7 @@ class AbstractModelLoaderSpec extends Specification {
             objectUnderTest.createTopLevelDataNode('dataspace','anchor','new node')
         then: 'the exception is ignored, and a log message is produced'
             noExceptionThrown()
-            assertLogContains('failed as data node already exists')
+            assert logContains('failed as data node already exists')
     }
 
     def 'Create a top-level node with any other exception.'() {
@@ -279,19 +279,47 @@ class AbstractModelLoaderSpec extends Specification {
             'Module revision does not exist' || []                       || false
     }
 
-    private void assertLogContains(String message) {
-        def logs = loggingListAppender.list.toString()
-        assert logs.contains(message)
+    def 'Check if this instance is master when: #scenario.'() {
+        given: 'the lock acquisition returns #lockAcquired'
+            mockModelLoaderCoordinatorLock.tryLock() >> lockAcquired
+        when: 'checking if this instance is master'
+            objectUnderTest.checkIfThisInstanceIsMaster()
+        then: 'the master status is set correctly'
+            assert objectUnderTest.@isMaster == expectedMasterStatus
+        and: 'expected log message is produced'
+            assert logContains(expectedLogMessage)
+        where: 'the following scenarios are used'
+            scenario                  | lockAcquired || expectedMasterStatus || expectedLogMessage
+            'lock can be acquired'    | true         || true                 || 'This instance is model loader master'
+            'lock cannot be acquired' | false        || false                || 'Another instance is model loader master'
+    }
+
+    def 'Check if this instance is master when already master.'() {
+        given: 'instance is already master'
+            objectUnderTest.@isMaster = true
+        when: 'check for master again'
+            objectUnderTest.checkIfThisInstanceIsMaster()
+        then: 'instance remains remains master'
+            assert objectUnderTest.@isMaster == true
+        and: 'no attempt is made to lock'
+            0 * mockModelLoaderCoordinatorLock.tryLock()
+        and: 'log reports this instance is master'
+            assert logContains('This instance is model loader master')
+    }
+
+    def logContains(expectedMessage) {
+        return loggingListAppender.list.toString().contains(expectedMessage)
     }
 
     class TestModelLoader extends AbstractModelLoader {
 
-        TestModelLoader(final CpsDataspaceService cpsDataspaceService,
+        TestModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+                        final CpsDataspaceService cpsDataspaceService,
                         final CpsModuleService cpsModuleService,
                         final CpsAnchorService cpsAnchorService,
                         final CpsDataService cpsDataService,
                         final ReadinessManager readinessManager) {
-            super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager)
+            super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager)
         }
 
         @Override
@@ -303,5 +331,6 @@ class AbstractModelLoaderSpec extends Specification {
         String getName() {
             // Not needed for testing
         }
+
     }
 }
index 58bfd5e..75c23c0 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2024-2025 TechMahindra Ltd.
+ *  Modifications Copyright (c) 2022-2025 OpenInfra Foundation Europe.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -30,8 +31,6 @@ import org.onap.cps.api.CpsModuleService
 import org.onap.cps.api.model.Dataspace
 import org.onap.cps.init.actuator.ReadinessManager
 import org.slf4j.LoggerFactory
-import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
 import org.springframework.context.annotation.AnnotationConfigApplicationContext
 import spock.lang.Specification
 
@@ -41,7 +40,8 @@ class CpsNotificationSubscriptionModelLoaderSpec extends Specification {
     def mockCpsDataService = Mock(CpsDataService)
     def mockCpsAnchorService = Mock(CpsAnchorService)
     def mockReadinessManager = Mock(ReadinessManager)
-    def objectUnderTest = new CpsNotificationSubscriptionModelLoader(mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
+    def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
+    def objectUnderTest = new CpsNotificationSubscriptionModelLoader(mockModelLoaderCoordinatorLock, mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
 
     def applicationContext = new AnnotationConfigApplicationContext()
 
@@ -55,6 +55,7 @@ class CpsNotificationSubscriptionModelLoaderSpec extends Specification {
     def MODEL_FILENAME = 'cps-notification-subscriptions@2024-07-03.yang'
 
     void setup() {
+        objectUnderTest.isMaster = true
         expectedYangResourcesToContents = objectUnderTest.mapYangResourcesToContent(MODEL_FILENAME)
         logger.setLevel(Level.DEBUG)
         loggingListAppender = new ListAppender()
@@ -72,8 +73,8 @@ class CpsNotificationSubscriptionModelLoaderSpec extends Specification {
     def 'Onboard subscription model via application ready event.'() {
         given: 'dataspace is already present'
             mockCpsDataspaceService.getAllDataspaces() >> [new Dataspace('test')]
-        when: 'the application is ready'
-            objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
+        when: 'the model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
         then: 'the module service to create schema set is called once'
             1 * mockCpsModuleService.createSchemaSet(CPS_DATASPACE_NAME, SCHEMASET_NAME, expectedYangResourcesToContents)
         and: 'the anchor service to create an anchor set is called once'
@@ -81,4 +82,13 @@ class CpsNotificationSubscriptionModelLoaderSpec extends Specification {
         and: 'the data service to create a top level datanode is called once'
             1 * mockCpsDataService.saveData(CPS_DATASPACE_NAME, ANCHOR_NAME, '{"dataspaces":{}}', _)
     }
+
+    def 'Onboarding is skipped when instance is not master'() {
+        given: 'instance is not master'
+            objectUnderTest.isMaster = false
+        when: 'the model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
+        then: 'the onboard/upgrade methods are not executed'
+            0 * mockCpsModuleService.createSchemaSet(*_)
+    }
 }
diff --git a/cps-service/src/test/groovy/org/onap/cps/init/ModelLoaderCoordinatorEndSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/init/ModelLoaderCoordinatorEndSpec.groovy
new file mode 100644 (file)
index 0000000..3ed95a6
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init
+
+import org.onap.cps.init.actuator.ReadinessManager
+import org.onap.cps.utils.Sleeper
+import spock.lang.Specification
+
+class ModelLoaderCoordinatorEndSpec extends Specification {
+
+    def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
+    def spiedSleeper = Spy(new Sleeper())
+    def mockReadinessManager = Mock(ReadinessManager)
+
+    def objectUnderTest = new ModelLoaderCoordinatorEnd(mockModelLoaderCoordinatorLock, null, null, null, null, mockReadinessManager, spiedSleeper)
+
+    def 'Model Loader Coordinator End for master instance.'() {
+        given: 'instance is master'
+            objectUnderTest.isMaster = true
+        when: 'model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
+        then: 'the model loader coordinator lock is released'
+            1 * mockModelLoaderCoordinatorLock.forceUnlock()
+        and: 'no checks for locked are made'
+            0 * mockModelLoaderCoordinatorLock.isLocked()
+    }
+
+    def 'Model Loader Coordinator End for non-master instance.'() {
+        given: 'instance is NOT master'
+            objectUnderTest.isMaster = false
+        and: 'the lock will be unlocked upon the third time checking (so expect 3 calls)'
+            3 * mockModelLoaderCoordinatorLock.isLocked() >>> [ true, true, false ]
+        when: 'model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
+        then: 'the system sleeps twice'
+            2 * spiedSleeper.haveALittleRest(_)
+    }
+
+    def 'Model Loader Coordinator End for non-master with exception during sleep.'() {
+        given: 'instance is NOT master'
+            objectUnderTest.isMaster = false
+        and: 'attempting the get the lock will succeed on the second attempt'
+            mockModelLoaderCoordinatorLock.isLocked() >>> [ true, false ]
+        when: 'model loader is started'
+            objectUnderTest.onboardOrUpgradeModel()
+        then: 'the system sleeps once (but is interrupted)'
+            1 * spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
+    }
+
+}
diff --git a/cps-service/src/test/groovy/org/onap/cps/init/ModelLoaderCoordinatorLockSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/init/ModelLoaderCoordinatorLockSpec.groovy
new file mode 100644 (file)
index 0000000..37605f6
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init
+
+import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.instance.impl.HazelcastInstanceFactory
+import spock.lang.Specification
+
+class ModelLoaderCoordinatorLockSpec extends Specification {
+    def cpsCommonLocks = HazelcastInstanceFactory.getOrCreateHazelcastInstance(new Config('hazelcastInstanceName')).getMap('cpsCommonLocks')
+
+    def objectUnderTest = new ModelLoaderCoordinatorLock(cpsCommonLocks)
+
+    def cleanupSpec() {
+        Hazelcast.getHazelcastInstanceByName('hazelcastInstanceName').shutdown()
+    }
+
+    def 'Locking and unlocking the coordinator lock.'() {
+        when: 'try to get a lock'
+            assert objectUnderTest.tryLock() == true
+        then: 'the lock is acquired'
+            assert objectUnderTest.isLocked() == true
+        and: 'can get the same lock again (reentrant locking)'
+            assert objectUnderTest.tryLock() == true
+        when: 'release the lock'
+            objectUnderTest.forceUnlock()
+        then: 'the lock is released'
+            assert objectUnderTest.isLocked() == false
+    }
+
+}
index be71d37..1e0c292 100644 (file)
@@ -1,6 +1,6 @@
 #  ============LICENSE_START=======================================================
 #  Copyright (c) 2021 Bell Canada.
-#  Modification Copyright (C) 2022-2024 Nordix Foundation.
+#  Modification Copyright (C) 2022-2025 Nordix Foundation.
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -41,3 +41,12 @@ spring:
 logging:
   level:
     org.apache.kafka: ERROR
+
+# Custom Hazelcast Config.
+hazelcast:
+  cluster-name: "cps-and-ncmp-test-caches"
+  instance-config-name: "cps-and-ncmp-hazelcast-instance-test-config"
+  mode:
+    kubernetes:
+      enabled: false
+      service-name: "cps-and-ncmp-service"
index e7312f8..b379d6e 100644 (file)
@@ -55,6 +55,7 @@ import org.onap.cps.ri.utils.SessionManager
 import org.onap.cps.spi.CpsModulePersistenceService
 import org.onap.cps.utils.JsonObjectMapper
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Qualifier
 import org.springframework.beans.factory.annotation.Value
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.boot.autoconfigure.domain.EntityScan
@@ -146,7 +147,8 @@ abstract class CpsIntegrationSpecBase extends Specification {
     BlockingQueue<String> moduleSyncWorkQueue
 
     @Autowired
-    IMap<String, String> cpsAndNcmpLock
+    @Qualifier("cpsCommonLocks")
+    IMap<String, String> cpsCommonLocks
 
     @Autowired
     JsonObjectMapper jsonObjectMapper