<groupId>${project.groupId}</groupId>
<artifactId>cps-path-parser</artifactId>
</dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-spring</artifactId>
- </dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
*/
@Bean
public IMap<String, Integer> cmHandlesByState() {
- return getOrCreateHazelcastInstance(cmHandleStateCacheMapConfig).getMap(
- "cmHandlesByState");
+ return getOrCreateHazelcastInstance(cmHandleStateCacheMapConfig).getMap("cmHandlesByState");
}
}
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
*
* @return configured map of cm handle id by alternate id
*/
- @Bean
+ @Bean("cmHandleIdPerAlternateId")
public IMap<String, String> cmHandleIdPerAlternateId() {
return getOrCreateHazelcastInstance(cmHandleIdPerAlternateIdMapConfig).getMap("cmHandleIdPerAlternateId");
}
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
import java.util.Map;
-import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
}
private final InventoryPersistence inventoryPersistence;
+
@Qualifier("cmHandleIdPerAlternateId")
private final IMap<String, String> cmHandleIdPerAlternateId;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final TrustLevelManager trustLevelManager;
private final AlternateIdChecker alternateIdChecker;
+
@Qualifier("cmHandleIdPerAlternateId")
private final IMap<String, String> cmHandleIdPerAlternateId;
package org.onap.cps.ncmp.impl.inventory.sync;
-import static org.onap.cps.ncmp.impl.cache.CpsAndNcmpLockConfig.MODULE_SYNC_WORK_QUEUE_LOCK_NAME;
-
import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
- @Qualifier("cpsAndNcmpLock")
- private final IMap<String, String> cpsAndNcmpLock;
+ @Qualifier("cpsCommonLocks") private final IMap<String, String> cpsCommonLocks;
private final ReadinessManager readinessManager;
private static final int MODULE_SYNC_BATCH_SIZE = 300;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
-
+ private static final String MODULE_SYNC_WORK_QUEUE_COMMON_LOCK_NAME = "workQueueLock";
/**
* Check DB for any cm handles in 'ADVISED' state.
}
/**
- * Populate work queue with advised cm handles from db.
+ * Populate module sync work queue with advised cm handles from db.
* This method is made public for (integration) testing purposes.
* So it can be tested without the queue being emptied immediately as the main public method does.
*/
public void populateWorkQueueIfNeeded() {
- if (moduleSyncWorkQueue.isEmpty() && cpsAndNcmpLock.tryLock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME)) {
+ if (moduleSyncWorkQueue.isEmpty() && cpsCommonLocks.tryLock(MODULE_SYNC_WORK_QUEUE_COMMON_LOCK_NAME)) {
log.debug("Lock acquired by thread : {}", Thread.currentThread().getName());
try {
populateWorkQueue();
setPreviouslyLockedCmHandlesToAdvised();
}
} finally {
- cpsAndNcmpLock.unlock(MODULE_SYNC_WORK_QUEUE_LOCK_NAME);
+ cpsCommonLocks.unlock(MODULE_SYNC_WORK_QUEUE_COMMON_LOCK_NAME);
log.debug("Lock released by thread : {}", Thread.currentThread().getName());
}
}
import com.hazelcast.config.QueueConfig;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
-import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.NearCacheConfig;
import com.hazelcast.map.IMap;
+import org.onap.cps.impl.cache.HazelcastCacheConfig;
import org.onap.cps.ncmp.api.inventory.models.TrustLevel;
-import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.onap.cps.api.CpsDataspaceService;
import org.onap.cps.api.CpsModuleService;
import org.onap.cps.init.AbstractModelLoader;
+import org.onap.cps.init.ModelLoaderCoordinatorLock;
import org.onap.cps.init.actuator.ReadinessManager;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
private static final String ANCHOR_NAME = "cm-data-job-subscriptions";
private static final String REGISTRY_DATA_NODE_NAME = "dataJob";
- public CmDataSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService,
+ public CmDataSubscriptionModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+ final CpsDataspaceService cpsDataspaceService,
final CpsModuleService cpsModuleService,
final CpsAnchorService cpsAnchorService,
final CpsDataService cpsDataService,
final ReadinessManager readinessManager) {
- super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager);
+ super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+ readinessManager);
}
@Override
public void onboardOrUpgradeModel() {
- log.info("Model Loader #3 Started: NCMP CM Data Notification Subscription Models");
- onboardSubscriptionModels();
- log.info("Model Loader #3 Completed");
+ if (isMaster) {
+ log.info("Model Loader #3 Started: NCMP CM Data Notification Subscription Models");
+ onboardSubscriptionModels();
+ log.info("Model Loader #3 Completed");
+ } else {
+ logMessageForNonMasterInstance();
+ }
}
private void onboardSubscriptionModels() {
import org.onap.cps.api.CpsDataspaceService;
import org.onap.cps.api.CpsModuleService;
import org.onap.cps.init.AbstractModelLoader;
+import org.onap.cps.init.ModelLoaderCoordinatorLock;
import org.onap.cps.init.actuator.ReadinessManager;
import org.onap.cps.ncmp.utils.events.NcmpInventoryModelOnboardingFinishedEvent;
import org.springframework.beans.factory.annotation.Value;
* Creates a new {@code InventoryModelLoader} instance responsible for onboarding or upgrading
* the NCMP inventory model schema sets and managing readiness state during migration.
*/
- public InventoryModelLoader(final CpsDataspaceService cpsDataspaceService,
+ public InventoryModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+ final CpsDataspaceService cpsDataspaceService,
final CpsModuleService cpsModuleService,
final CpsAnchorService cpsAnchorService,
final CpsDataService cpsDataService,
final ApplicationEventPublisher applicationEventPublisher,
final ReadinessManager readinessManager) {
- super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager);
+ super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+ readinessManager);
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void onboardOrUpgradeModel() {
- log.info("Model Loader #2 Started: NCMP Inventory Models");
- final String schemaToInstall = newRevisionEnabled ? NEW_INVENTORY_SCHEMA_SET_NAME : PREVIOUS_SCHEMA_SET_NAME;
- final String moduleRevision = getModuleRevision(schemaToInstall);
+ if (isMaster) {
+ log.info("Model Loader #2 Started: NCMP Inventory Models");
+ final String schemaToInstall =
+ newRevisionEnabled ? NEW_INVENTORY_SCHEMA_SET_NAME : PREVIOUS_SCHEMA_SET_NAME;
+ final String moduleRevision = getModuleRevision(schemaToInstall);
- if (isModuleRevisionInstalled(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, INVENTORY_YANG_MODULE_NAME,
+ if (isModuleRevisionInstalled(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, INVENTORY_YANG_MODULE_NAME,
moduleRevision)) {
- log.info("Revision {} is already installed.", moduleRevision);
- } else if (newRevisionEnabled && doesAnchorExist(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR)) {
- upgradeAndMigrateInventoryModel();
+ log.info("Revision {} is already installed.", moduleRevision);
+ } else if (newRevisionEnabled && doesAnchorExist(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR)) {
+ upgradeAndMigrateInventoryModel();
+ } else {
+ installInventoryModel(schemaToInstall);
+ }
+ applicationEventPublisher.publishEvent(new NcmpInventoryModelOnboardingFinishedEvent(this));
+ log.info("Model Loader #2 Completed");
} else {
- installInventoryModel(schemaToInstall);
+ logMessageForNonMasterInstance();
}
- applicationEventPublisher.publishEvent(new NcmpInventoryModelOnboardingFinishedEvent(this));
- log.info("Model Loader #2 Completed");
}
private void installInventoryModel(final String schemaSetName) {
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 Nordix Foundation
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import ch.qos.logback.core.read.ListAppender
import com.hazelcast.map.IMap
import org.onap.cps.init.actuator.ReadinessManager
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.slf4j.LoggerFactory
+import spock.lang.Specification
import java.util.concurrent.ArrayBlockingQueue
-import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import spock.lang.Specification
class ModuleSyncWatchdogSpec extends Specification {
def mockModuleSyncTasks = Mock(ModuleSyncTasks)
- def mockCpsAndNcmpLock = Mock(IMap<String,String>)
+ def mockCpsCommonLocks = Mock(IMap<String,String>)
def mockReadinessManager = Mock(ReadinessManager)
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock, mockReadinessManager)
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsCommonLocks, mockReadinessManager)
def logAppender = Spy(ListAppender<ILoggingEvent>)
and: 'module sync utilities returns no failed (locked) cm handles'
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
and: 'the work queue can be locked'
- mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
+ mockCpsCommonLocks.tryLock('workQueueLock') >> true
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs #expectedNumberOfTaskExecutions tasks'
expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(*_)
and: 'the executing thread is unlocked'
- 1 * mockCpsAndNcmpLock.unlock('workQueueLock')
+ 1 * mockCpsCommonLocks.unlock('workQueueLock')
where: 'the following parameter are used'
scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
'none at all' | 0 || 0
and: 'module sync utilities returns a advise cm handles'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
- mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
+ mockCpsCommonLocks.tryLock('workQueueLock') >> true
when: ' module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
then: 'it performs one task'
and: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
- mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
+ mockCpsCommonLocks.tryLock('workQueueLock') >> true
and: 'the semaphore cache indicates the cm handle is already being processed'
mockModuleSyncStartedOnCmHandles.putIfAbsent(*_) >> 'Started'
when: 'module sync is started'
and: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'can be locked is : #canLock'
- mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock
+ mockCpsCommonLocks.tryLock('workQueueLock') >> canLock
when: 'attempt to populate the work queue'
objectUnderTest.populateWorkQueueIfNeeded()
then: 'the queue remains empty is #expectQueueRemainsEmpty'
assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
and: 'unlock is called only when thread is able to enter the critical section'
- expectedInvocationToUnlock * mockCpsAndNcmpLock.unlock('workQueueLock')
+ expectedInvocationToUnlock * mockCpsCommonLocks.unlock('workQueueLock')
where: 'the following lock states are applied'
canLock || expectQueueRemainsEmpty || expectedInvocationToUnlock
false || true || 0
package org.onap.cps.ncmp.impl.inventory.sync
-import com.hazelcast.config.Config
+
import com.hazelcast.core.Hazelcast
import com.hazelcast.map.IMap
-import java.util.concurrent.BlockingQueue
-import java.util.concurrent.TimeUnit
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification
import spock.util.concurrent.PollingConditions
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.TimeUnit
+
@SpringBootTest
@ContextConfiguration(classes = [SynchronizationCacheConfig])
class SynchronizationCacheConfigSpec extends Specification {
assert !hzConfig.join.kubernetesConfig.enabled
}
- def 'Verify network config'() {
- given: 'Synchronization config object and test configuration'
- def objectUnderTest = new SynchronizationCacheConfig()
- def testConfig = new Config()
- when: 'kubernetes properties are enabled'
- objectUnderTest.cacheKubernetesEnabled = true
- objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
- and: 'method called to update the discovery mode'
- objectUnderTest.updateDiscoveryMode(testConfig)
- then: 'applied properties are reflected'
- assert testConfig.networkConfig.join.kubernetesConfig.enabled
- assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
- }
-
def 'Time to Live Verify for Module Sync Semaphore'() {
when: 'the key is inserted with a TTL'
moduleSyncStartedOnCmHandles.put('testKeyModuleSync', 'toBeExpired' as Object, 100, TimeUnit.MILLISECONDS)
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.impl.inventory.trustlevel
-import com.hazelcast.config.Config
import com.hazelcast.core.Hazelcast
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.inventory.models.TrustLevel
assert !trustLevelPerCmHandlePluginCacheConfig.join.kubernetesConfig.enabled
}
- def 'Verify network config'() {
- given: 'Synchronization config object and test configuration'
- def objectUnderTest = new TrustLevelCacheConfig()
- def testConfig = new Config()
- when: 'kubernetes properties are enabled'
- objectUnderTest.cacheKubernetesEnabled = true
- objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
- and: 'method called to update the discovery mode'
- objectUnderTest.updateDiscoveryMode(testConfig)
- then: 'applied properties are reflected'
- assert testConfig.networkConfig.join.kubernetesConfig.enabled
- assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
- }
-
}
import org.onap.cps.api.CpsDataspaceService
import org.onap.cps.api.CpsModuleService
import org.onap.cps.api.model.Dataspace
+import org.onap.cps.init.ModelLoaderCoordinatorLock
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
-import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
class CmDataSubscriptionModelLoaderSpec extends Specification {
+ def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
def mockCpsDataspaceService = Mock(CpsDataspaceService)
def mockCpsModuleService = Mock(CpsModuleService)
def mockCpsDataService = Mock(CpsDataService)
def mockCpsAnchorService = Mock(CpsAnchorService)
def mockReadinessManager = Mock(ReadinessManager)
- def objectUnderTest = new CmDataSubscriptionModelLoader(mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
+ def objectUnderTest = new CmDataSubscriptionModelLoader(mockModelLoaderCoordinatorLock, mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
def applicationContext = new AnnotationConfigApplicationContext()
def loggingListAppender
void setup() {
+ objectUnderTest.isMaster = true
expectedYangResourcesToContentMap = objectUnderTest.mapYangResourcesToContent('cm-data-job-subscriptions@2025-09-03.yang')
logger.setLevel(Level.DEBUG)
loggingListAppender = new ListAppender()
def 'Onboard subscription model via application ready event.'() {
given: 'dataspace is ready for use'
mockCpsDataspaceService.getDataspace(NCMP_DATASPACE_NAME) >> new Dataspace('')
- when: 'the application is ready'
- objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
+ when: 'the model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
then: 'the module service to create schema set is called once'
1 * mockCpsModuleService.createSchemaSet(NCMP_DATASPACE_NAME, 'cm-data-job-subscriptions', expectedYangResourcesToContentMap)
and: 'the admin service to create an anchor set is called once'
1 * mockCpsDataService.saveData(NCMP_DATASPACE_NAME, 'cm-data-job-subscriptions', '{"dataJob":{}}', _)
}
+ def 'Onboarding is skipped when instance is not master'() {
+ given: 'instance is not master'
+ objectUnderTest.isMaster = false
+ when: 'the model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
+ then: 'the onboard/upgrade methods are not executed'
+ 0 * mockCpsModuleService.createSchemaSet(*_)
+ }
+
}
import org.onap.cps.api.exceptions.AnchorNotFoundException
import org.onap.cps.api.model.Dataspace
import org.onap.cps.api.model.ModuleDefinition
+import org.onap.cps.init.ModelLoaderCoordinatorLock
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
class InventoryModelLoaderSpec extends Specification {
+ def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
def mockCpsAdminService = Mock(CpsDataspaceService)
def mockCpsModuleService = Mock(CpsModuleService)
def mockCpsDataService = Mock(CpsDataService)
def mockCpsAnchorService = Mock(CpsAnchorService)
def mockApplicationEventPublisher = Mock(ApplicationEventPublisher)
def mockReadinessManager = Mock(ReadinessManager)
- def objectUnderTest = new InventoryModelLoader(mockCpsAdminService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockApplicationEventPublisher, mockReadinessManager)
+ def objectUnderTest = new InventoryModelLoader(mockModelLoaderCoordinatorLock, mockCpsAdminService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockApplicationEventPublisher, mockReadinessManager)
def applicationContext = new AnnotationConfigApplicationContext()
def loggingListAppender
void setup() {
+ objectUnderTest.isMaster = true
expectedPreviousYangResourceToContentMap = objectUnderTest.mapYangResourcesToContent('dmi-registry@2024-02-23.yang')
expectedNewYangResourceToContentMap = objectUnderTest.mapYangResourcesToContent('dmi-registry@2025-07-22.yang')
objectUnderTest.newRevisionEnabled = true
assert loggingListAppender.list.any { it.message.contains("Inventory upgraded successfully") }
}
+ def 'Onboarding is skipped when instance is not master'() {
+ given: 'instance is not master'
+ objectUnderTest.isMaster = false
+ when: 'the model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
+ then: 'the onboard/upgrade methods are not executed'
+ 0 * mockCpsModuleService.createSchemaSet(*_)
+ }
+
+
def 'Skip upgrade model revision when new revision already installed'() {
given: 'the anchor exists and the new model revision is already installed'
mockCpsAnchorService.getAnchor(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR) >> {}
- mockCpsModuleService.getModuleDefinitionsByAnchorAndModule(_, _, _, _) >> [new ModuleDefinition('', '', '')]
- when: 'the inventory model loader is triggered'
+ mockCpsModuleService.getModuleDefinitionsByAnchorAndModule(*_) >> [new ModuleDefinition('', '', '')]
+ when: 'the model loader is started'
objectUnderTest.onboardOrUpgradeModel()
then: 'no new schema set is created'
- 0 * mockCpsModuleService.createSchemaSet(_, _, _)
+ 0 * mockCpsModuleService.createSchemaSet(*_)
and: 'a log message confirms the revision is already installed'
assert loggingListAppender.list.any { it.message.contains("already installed") }
}
+
+
}
# ============LICENSE_START=======================================================
-# Copyright (C) 2021 Nordix Foundation
+# Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
<groupId>org.onap.cps</groupId>
<artifactId>cps-path-parser</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast-spring</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-model-api</artifactId>
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cache;
+package org.onap.cps.impl.cache;
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
import org.springframework.context.annotation.Configuration;
@Configuration
-public class CpsAndNcmpLockConfig extends HazelcastCacheConfig {
+public class CpsCommonLocksConfig extends HazelcastCacheConfig {
- // Lock names for different use cases ( to be used as cpsAndNcmpLock keys)
- public static final String MODULE_SYNC_WORK_QUEUE_LOCK_NAME = "workQueueLock";
-
- private static final MapConfig cpsAndNcmpLockMapConfig = createGenericMapConfig("cpsAndNcmpLockConfig");
+ private static final MapConfig cpsCommonLocksMapConfig = createGenericMapConfig("cpsCommonLocksMapConfig");
/**
- * Distributed instance used for locking purpose for various use cases in cps-and-ncmp.
+ * Distributed instance used for locking purpose for various use cases in cps and dependant services.
* The key of the map entry is name of the lock and should be based on the use case we are locking.
*
- * @return configured map of lock object to have distributed coordination.
+ * @return configured map of lock objects to have distributed coordination.
*/
- @Bean
- public IMap<String, String> cpsAndNcmpLock() {
- return getOrCreateHazelcastInstance(cpsAndNcmpLockMapConfig).getMap("cpsAndNcmpLock");
+ @Bean("cpsCommonLocks")
+ public IMap<String, String> cpsCommonLocks() {
+ return getOrCreateHazelcastInstance(cpsCommonLocksMapConfig).getMap("cpsCommonLocks");
}
-
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cache;
+package org.onap.cps.impl.cache;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
protected HazelcastInstance getOrCreateHazelcastInstance(final NamedConfig namedConfig) {
return Hazelcast.getOrCreateHazelcastInstance(defineInstanceConfig(instanceConfigName, namedConfig));
-
}
private Config defineInstanceConfig(final String instanceConfigName, final NamedConfig namedConfig) {
return mapConfig;
}
-
-
protected static QueueConfig createQueueConfig(final String configName) {
final QueueConfig commonQueueConfig = new QueueConfig(configName);
commonQueueConfig.setBackupCount(1);
@RequiredArgsConstructor
public abstract class AbstractModelLoader implements ModelLoader {
+ protected final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock;
protected final CpsDataspaceService cpsDataspaceService;
private final CpsModuleService cpsModuleService;
protected final CpsAnchorService cpsAnchorService;
protected final CpsDataService cpsDataService;
protected final ReadinessManager readinessManager;
+ protected boolean isMaster = false;
+
private final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper());
private static final int EXIT_CODE_ON_ERROR = 1;
@Override
public void onApplicationEvent(final ApplicationReadyEvent applicationReadyEvent) {
try {
+ checkIfThisInstanceIsMaster();
onboardOrUpgradeModel();
} catch (final Exception exception) {
- log.error("Exiting application due to failure in onboarding model: {} ",
+ log.error("Exiting application due to failure in onboarding model: {}",
exception.getMessage());
exitApplication(applicationReadyEvent);
} finally {
return !moduleDefinitions.isEmpty();
}
+ public void logMessageForNonMasterInstance() {
+ log.info("This instance is not model loader master, skipping model loader for: {}", getName());
+ }
Map<String, String> mapYangResourcesToContent(final String... resourceNames) {
final Map<String, String> yangResourceContentByName = new HashMap<>();
return yangResourceContentByName;
}
+ void checkIfThisInstanceIsMaster() {
+ isMaster = isMaster || modelLoaderCoordinatorLock.tryLock();
+ if (isMaster) {
+ log.info("This instance is model loader master");
+ } else {
+ log.info("Another instance is model loader master");
+ }
+ }
+
private String getFileContentAsString(final String fileName) {
try (final InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName)) {
return new String(inputStream != null ? inputStream.readAllBytes() : null, StandardCharsets.UTF_8);
private static final String CPS_DATASPACE_NAME = "CPS-Admin";
private static final String REGISTRY_DATANODE_NAME = "dataspaces";
- public CpsNotificationSubscriptionModelLoader(final CpsDataspaceService cpsDataspaceService,
+ public CpsNotificationSubscriptionModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+ final CpsDataspaceService cpsDataspaceService,
final CpsModuleService cpsModuleService,
final CpsAnchorService cpsAnchorService,
final CpsDataService cpsDataService,
final ReadinessManager readinessManager) {
- super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager);
+ super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+ readinessManager);
}
@Override
public void onboardOrUpgradeModel() {
- log.info("Model Loader #1 Started: CPS Data Notification Subscription Models");
- onboardSubscriptionModels();
- log.info("Model Loader #1 Completed");
+ if (isMaster) {
+ log.info("Model Loader #1 Started: CPS Data Notification Subscription Models");
+ onboardSubscriptionModels();
+ log.info("Model Loader #1 Completed");
+ } else {
+ logMessageForNonMasterInstance();
+ }
}
private void onboardSubscriptionModels() {
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init;
+
+import static org.springframework.core.Ordered.LOWEST_PRECEDENCE;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsAnchorService;
+import org.onap.cps.api.CpsDataService;
+import org.onap.cps.api.CpsDataspaceService;
+import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.init.actuator.ReadinessManager;
+import org.onap.cps.utils.Sleeper;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@Order(LOWEST_PRECEDENCE)
+public class ModelLoaderCoordinatorEnd extends AbstractModelLoader {
+
+ final Sleeper sleeper;
+
+ /**
+ * Constructor.
+ *
+ * @param modelLoaderCoordinatorLock the modelLoaderCoordinatorLock
+ * @param cpsDataspaceService the cpsDataspaceService
+ * @param cpsModuleService the cpsModuleService
+ * @param cpsAnchorService the cpsAnchorService
+ * @param cpsDataService the cpsDataService
+ * @param readinessManager the readinessManager
+ * @param sleeper the sleeper
+ */
+ public ModelLoaderCoordinatorEnd(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+ final CpsDataspaceService cpsDataspaceService,
+ final CpsModuleService cpsModuleService,
+ final CpsAnchorService cpsAnchorService,
+ final CpsDataService cpsDataService,
+ final ReadinessManager readinessManager,
+ final Sleeper sleeper) {
+ super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService,
+ readinessManager);
+ this.sleeper = sleeper;
+ }
+
+ @Override
+ public void onboardOrUpgradeModel() {
+ log.info("Model Loader #999 Started: Coordinator End, check model loaders are completed");
+ if (isMaster) {
+ releaseLock();
+ log.info("This instance is model loader master. Model loading completed");
+ } else {
+ log.info("Wait for model loader master to finish");
+ waitForMasterToFinish();
+ }
+ log.info("Model Loader #999 Completed");
+ }
+
+ private void releaseLock() {
+ modelLoaderCoordinatorLock.forceUnlock();
+ log.info("Model loading (on master) finished");
+ }
+
+ private void waitForMasterToFinish() {
+ while (modelLoaderCoordinatorLock.isLocked()) {
+ log.info("Still waiting for model loader master to finish");
+ try {
+ sleeper.haveALittleRest(100);
+ } catch (final InterruptedException e) {
+ log.warn("I cannot sleep (ignored)");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init;
+
+import com.hazelcast.map.IMap;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class ModelLoaderCoordinatorLock {
+
+ @Qualifier("cpsCommonLocks")
+ private final IMap<String, String> cpsCommonLocks;
+
+ private static final String MODULE_LOADER_LOCK_NAME = "modelLoaderLock";
+
+ boolean tryLock() {
+ return cpsCommonLocks.tryLock(MODULE_LOADER_LOCK_NAME);
+ }
+
+ boolean isLocked() {
+ return cpsCommonLocks.isLocked(MODULE_LOADER_LOCK_NAME);
+ }
+
+ void forceUnlock() {
+ cpsCommonLocks.forceUnlock(MODULE_LOADER_LOCK_NAME);
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.utils;
+
+import java.util.concurrent.TimeUnit;
+import org.springframework.stereotype.Service;
+
+/**
+ * This class is to extract out sleep functionality so the interrupted exception handling can
+ * be covered with a test (e.g. using spy on Sleeper) and help to get too 100% code coverage.
+ */
+@Service
+public class Sleeper {
+ public void haveALittleRest(final int timeInMilliSeconds) throws InterruptedException {
+ TimeUnit.MILLISECONDS.sleep(timeInMilliSeconds);
+ }
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.impl.cache
+
+import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.map.IMap
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+@SpringBootTest
+@ContextConfiguration(classes = [CpsCommonLocksConfig])
+class CpsCommonLocksConfigSpec extends Specification {
+
+ @Autowired
+ IMap<String, String> cpsCommonLocks
+
+ def cleanupSpec() {
+ Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').shutdown()
+ }
+
+ def 'Hazelcast common locks'() {
+ expect: 'system is able to create an instance of the common locks cache'
+ assert null != cpsCommonLocks
+ and: 'there is at least 1 instance'
+ assert Hazelcast.allHazelcastInstances.size() > 0
+ and: 'Hazelcast cache instance for common locks is present'
+ assert Hazelcast.getHazelcastInstanceByName('cps-and-ncmp-hazelcast-instance-test-config').getMap('cpsCommonLocks') != null
+ }
+
+ def 'Verify network config'() {
+ given: 'Synchronization config object and test configuration'
+ def objectUnderTest = new CpsCommonLocksConfig()
+ def testConfig = new Config()
+ when: 'kubernetes properties are enabled'
+ objectUnderTest.cacheKubernetesEnabled = true
+ objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
+ and: 'method called to update the discovery mode'
+ objectUnderTest.updateDiscoveryMode(testConfig)
+ then: 'applied properties are reflected'
+ assert testConfig.networkConfig.join.kubernetesConfig.enabled
+ assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
+ }
+}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.impl.cache
+package org.onap.cps.impl.cache
import com.hazelcast.core.Hazelcast
import spock.lang.Specification
assert result.config.setConfigs.values()[0].name == 'my set config'
}
where: 'the following configs are used'
- scenario | config || expectMapConfig | expectQueueConfig | expectSetConfig
- 'Map Config' | HazelcastCacheConfig.createGenericMapConfig('my map config') || true | false | false
- 'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false | true | false
- 'Set Config' | HazelcastCacheConfig.createSetConfig('my set config') || false | false | true
+ scenario | config || expectMapConfig | expectQueueConfig | expectSetConfig
+ 'Map Config' | HazelcastCacheConfig.createGenericMapConfig('my map config') || true | false | false
+ 'Queue Config' | HazelcastCacheConfig.createQueueConfig('my queue config') || false | true | false
+ 'Set Config' | HazelcastCacheConfig.createSetConfig('my set config') || false | false | true
}
}
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsDataspaceService
import org.onap.cps.api.CpsModuleService
+import org.onap.cps.api.exceptions.AlreadyDefinedException
import org.onap.cps.api.exceptions.AnchorNotFoundException
import org.onap.cps.api.exceptions.DuplicatedYangResourceException
import org.onap.cps.api.exceptions.ModelOnboardingException
import org.onap.cps.api.model.ModuleDefinition
import org.onap.cps.api.parameters.CascadeDeleteAllowed
-import org.onap.cps.api.exceptions.AlreadyDefinedException
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
import org.springframework.boot.SpringApplication
import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
class AbstractModelLoaderSpec extends Specification {
+ def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
def mockCpsDataspaceService = Mock(CpsDataspaceService)
def mockCpsModuleService = Mock(CpsModuleService)
def mockCpsDataService = Mock(CpsDataService)
def mockCpsAnchorService = Mock(CpsAnchorService)
def mockReadinessManager = Mock(ReadinessManager)
- def objectUnderTest = Spy(new TestModelLoader(mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager))
+ def objectUnderTest = Spy(new TestModelLoader(mockModelLoaderCoordinatorLock, mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager))
def applicationContext = new AnnotationConfigApplicationContext()
def logger = (Logger) LoggerFactory.getLogger(AbstractModelLoader)
def loggingListAppender
- void setup() {
+ def setup() {
logger.setLevel(Level.DEBUG)
loggingListAppender = new ListAppender()
logger.addAppender(loggingListAppender)
applicationContext.refresh()
}
- void cleanup() {
+ def cleanup() {
logger.detachAndStopAllAppenders()
applicationContext.close()
loggingListAppender.stop()
then: 'the exception is ignored i.e. no exception thrown up'
noExceptionThrown()
and: 'the exception is ignored, and a log message is produced'
- assertLogContains('Dataspace already exists')
+ assert logContains('Dataspace already exists')
}
def 'Creating a dataspace handles other exception.'() {
objectUnderTest.createSchemaSet('some dataspace','some schema set','cps-notification-subscriptions@2024-07-03.yang')
then: 'exception is ignored, and correct exception message is logged'
noExceptionThrown()
- assertLogContains('Ignoring yang resource duplication exception. Assuming model was created by another instance')
+ assert logContains('Ignoring yang resource duplication exception. Assuming model was created by another instance')
}
def 'Creating a schema set handles already defined exception.'() {
objectUnderTest.createSchemaSet('some dataspace','new name','cps-notification-subscriptions@2024-07-03.yang')
then: 'the exception is ignored, and a log message is produced'
noExceptionThrown()
- assertLogContains('Creating new schema set failed as schema set already exists')
+ assert logContains('Creating new schema set failed as schema set already exists')
}
def 'Creating a schema set from a non-existing YANG file.'() {
objectUnderTest.createAnchor('some dataspace','some schema set','new name')
then: 'the exception is ignored, and a log message is produced'
noExceptionThrown()
- assertLogContains('Creating new anchor failed as anchor already exists')
+ assert logContains('Creating new anchor failed as anchor already exists')
}
def 'Creating an anchor handles other exceptions.'() {
objectUnderTest.createTopLevelDataNode('dataspace','anchor','new node')
then: 'the exception is ignored, and a log message is produced'
noExceptionThrown()
- assertLogContains('failed as data node already exists')
+ assert logContains('failed as data node already exists')
}
def 'Create a top-level node with any other exception.'() {
'Module revision does not exist' || [] || false
}
- private void assertLogContains(String message) {
- def logs = loggingListAppender.list.toString()
- assert logs.contains(message)
+ def 'Check if this instance is master when: #scenario.'() {
+ given: 'the lock acquisition returns #lockAcquired'
+ mockModelLoaderCoordinatorLock.tryLock() >> lockAcquired
+ when: 'checking if this instance is master'
+ objectUnderTest.checkIfThisInstanceIsMaster()
+ then: 'the master status is set correctly'
+ assert objectUnderTest.@isMaster == expectedMasterStatus
+ and: 'expected log message is produced'
+ assert logContains(expectedLogMessage)
+ where: 'the following scenarios are used'
+ scenario | lockAcquired || expectedMasterStatus || expectedLogMessage
+ 'lock can be acquired' | true || true || 'This instance is model loader master'
+ 'lock cannot be acquired' | false || false || 'Another instance is model loader master'
+ }
+
+ def 'Check if this instance is master when already master.'() {
+ given: 'instance is already master'
+ objectUnderTest.@isMaster = true
+ when: 'check for master again'
+ objectUnderTest.checkIfThisInstanceIsMaster()
+ then: 'instance remains remains master'
+ assert objectUnderTest.@isMaster == true
+ and: 'no attempt is made to lock'
+ 0 * mockModelLoaderCoordinatorLock.tryLock()
+ and: 'log reports this instance is master'
+ assert logContains('This instance is model loader master')
+ }
+
+ def logContains(expectedMessage) {
+ return loggingListAppender.list.toString().contains(expectedMessage)
}
class TestModelLoader extends AbstractModelLoader {
- TestModelLoader(final CpsDataspaceService cpsDataspaceService,
+ TestModelLoader(final ModelLoaderCoordinatorLock modelLoaderCoordinatorLock,
+ final CpsDataspaceService cpsDataspaceService,
final CpsModuleService cpsModuleService,
final CpsAnchorService cpsAnchorService,
final CpsDataService cpsDataService,
final ReadinessManager readinessManager) {
- super(cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager)
+ super(modelLoaderCoordinatorLock, cpsDataspaceService, cpsModuleService, cpsAnchorService, cpsDataService, readinessManager)
}
@Override
String getName() {
// Not needed for testing
}
+
}
}
/*
* ============LICENSE_START=======================================================
* Copyright (C) 2024-2025 TechMahindra Ltd.
+ * Modifications Copyright (c) 2022-2025 OpenInfra Foundation Europe.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.cps.api.model.Dataspace
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
-import org.springframework.boot.context.event.ApplicationReadyEvent
-import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
def mockCpsDataService = Mock(CpsDataService)
def mockCpsAnchorService = Mock(CpsAnchorService)
def mockReadinessManager = Mock(ReadinessManager)
- def objectUnderTest = new CpsNotificationSubscriptionModelLoader(mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
+ def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
+ def objectUnderTest = new CpsNotificationSubscriptionModelLoader(mockModelLoaderCoordinatorLock, mockCpsDataspaceService, mockCpsModuleService, mockCpsAnchorService, mockCpsDataService, mockReadinessManager)
def applicationContext = new AnnotationConfigApplicationContext()
def MODEL_FILENAME = 'cps-notification-subscriptions@2024-07-03.yang'
void setup() {
+ objectUnderTest.isMaster = true
expectedYangResourcesToContents = objectUnderTest.mapYangResourcesToContent(MODEL_FILENAME)
logger.setLevel(Level.DEBUG)
loggingListAppender = new ListAppender()
def 'Onboard subscription model via application ready event.'() {
given: 'dataspace is already present'
mockCpsDataspaceService.getAllDataspaces() >> [new Dataspace('test')]
- when: 'the application is ready'
- objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
+ when: 'the model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
then: 'the module service to create schema set is called once'
1 * mockCpsModuleService.createSchemaSet(CPS_DATASPACE_NAME, SCHEMASET_NAME, expectedYangResourcesToContents)
and: 'the anchor service to create an anchor set is called once'
and: 'the data service to create a top level datanode is called once'
1 * mockCpsDataService.saveData(CPS_DATASPACE_NAME, ANCHOR_NAME, '{"dataspaces":{}}', _)
}
+
+ def 'Onboarding is skipped when instance is not master'() {
+ given: 'instance is not master'
+ objectUnderTest.isMaster = false
+ when: 'the model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
+ then: 'the onboard/upgrade methods are not executed'
+ 0 * mockCpsModuleService.createSchemaSet(*_)
+ }
}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init
+
+import org.onap.cps.init.actuator.ReadinessManager
+import org.onap.cps.utils.Sleeper
+import spock.lang.Specification
+
+class ModelLoaderCoordinatorEndSpec extends Specification {
+
+ def mockModelLoaderCoordinatorLock = Mock(ModelLoaderCoordinatorLock)
+ def spiedSleeper = Spy(new Sleeper())
+ def mockReadinessManager = Mock(ReadinessManager)
+
+ def objectUnderTest = new ModelLoaderCoordinatorEnd(mockModelLoaderCoordinatorLock, null, null, null, null, mockReadinessManager, spiedSleeper)
+
+ def 'Model Loader Coordinator End for master instance.'() {
+ given: 'instance is master'
+ objectUnderTest.isMaster = true
+ when: 'model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
+ then: 'the model loader coordinator lock is released'
+ 1 * mockModelLoaderCoordinatorLock.forceUnlock()
+ and: 'no checks for locked are made'
+ 0 * mockModelLoaderCoordinatorLock.isLocked()
+ }
+
+ def 'Model Loader Coordinator End for non-master instance.'() {
+ given: 'instance is NOT master'
+ objectUnderTest.isMaster = false
+ and: 'the lock will be unlocked upon the third time checking (so expect 3 calls)'
+ 3 * mockModelLoaderCoordinatorLock.isLocked() >>> [ true, true, false ]
+ when: 'model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
+ then: 'the system sleeps twice'
+ 2 * spiedSleeper.haveALittleRest(_)
+ }
+
+ def 'Model Loader Coordinator End for non-master with exception during sleep.'() {
+ given: 'instance is NOT master'
+ objectUnderTest.isMaster = false
+ and: 'attempting the get the lock will succeed on the second attempt'
+ mockModelLoaderCoordinatorLock.isLocked() >>> [ true, false ]
+ when: 'model loader is started'
+ objectUnderTest.onboardOrUpgradeModel()
+ then: 'the system sleeps once (but is interrupted)'
+ 1 * spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init
+
+import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.instance.impl.HazelcastInstanceFactory
+import spock.lang.Specification
+
+class ModelLoaderCoordinatorLockSpec extends Specification {
+ def cpsCommonLocks = HazelcastInstanceFactory.getOrCreateHazelcastInstance(new Config('hazelcastInstanceName')).getMap('cpsCommonLocks')
+
+ def objectUnderTest = new ModelLoaderCoordinatorLock(cpsCommonLocks)
+
+ def cleanupSpec() {
+ Hazelcast.getHazelcastInstanceByName('hazelcastInstanceName').shutdown()
+ }
+
+ def 'Locking and unlocking the coordinator lock.'() {
+ when: 'try to get a lock'
+ assert objectUnderTest.tryLock() == true
+ then: 'the lock is acquired'
+ assert objectUnderTest.isLocked() == true
+ and: 'can get the same lock again (reentrant locking)'
+ assert objectUnderTest.tryLock() == true
+ when: 'release the lock'
+ objectUnderTest.forceUnlock()
+ then: 'the lock is released'
+ assert objectUnderTest.isLocked() == false
+ }
+
+}
# ============LICENSE_START=======================================================
# Copyright (c) 2021 Bell Canada.
-# Modification Copyright (C) 2022-2024 Nordix Foundation.
+# Modification Copyright (C) 2022-2025 Nordix Foundation.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
logging:
level:
org.apache.kafka: ERROR
+
+# Custom Hazelcast Config.
+hazelcast:
+ cluster-name: "cps-and-ncmp-test-caches"
+ instance-config-name: "cps-and-ncmp-hazelcast-instance-test-config"
+ mode:
+ kubernetes:
+ enabled: false
+ service-name: "cps-and-ncmp-service"
import org.onap.cps.spi.CpsModulePersistenceService
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.autoconfigure.domain.EntityScan
BlockingQueue<String> moduleSyncWorkQueue
@Autowired
- IMap<String, String> cpsAndNcmpLock
+ @Qualifier("cpsCommonLocks")
+ IMap<String, String> cpsCommonLocks
@Autowired
JsonObjectMapper jsonObjectMapper