timers:
advised-modules-sync:
- initial-delay-ms: 40000
sleep-time-ms: 5000
cm-handle-data-sync:
- initial-delay-ms: 40000
sleep-time-ms: 30000
model-loader:
retry-time-ms: 1000
"org.onap.cps.cpspath..",
"org.onap.cps.events..",
"org.onap.cps.impl..",
- "org.onap.cps.utils.."));
+ "org.onap.cps.utils..",
+ "org.onap.cps.init.."));
}
timers:
advised-modules-sync:
- initial-delay-ms: 40000
sleep-time-ms: 5000
cm-handle-data-sync:
- initial-delay-ms: 40000
sleep-time-ms: 30000
model-loader:
retry-time-ms: 1000
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsModuleService;
+import org.onap.cps.init.actuator.ReadinessManager;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
import org.onap.cps.ncmp.api.inventory.models.CompositeState;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
private final CpsDataService cpsDataService;
private final ModuleOperationsUtils moduleOperationsUtils;
private final IMap<String, Boolean> dataSyncSemaphores;
+ private final ReadinessManager readinessManager;
/**
* Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
+ * 'UNSYNCHRONIZED' when the system is in ready state.
+ */
+ @Scheduled(fixedDelayString = "${ncmp.timers.cm-handle-data-sync.sleep-time-ms:30000}")
+ public void scheduledUnsynchronizedReadyCmHandleForInitialDataSync() {
+ if (!readinessManager.isReady()) {
+ log.info("System is not ready yet");
+ return;
+ }
+ executeUnsynchronizedReadyCmHandleForInitialDataSync();
+ }
+
+ /**
+ * This method queries the cm handle state in 'READY' and Operational Datastore Sync State in
* 'UNSYNCHRONIZED'.
*/
- @Scheduled(initialDelayString = "${ncmp.timers.cm-handle-data-sync.initial-delay-ms:40000}",
- fixedDelayString = "${ncmp.timers.cm-handle-data-sync.sleep-time-ms:30000}")
public void executeUnsynchronizedReadyCmHandleForInitialDataSync() {
final List<YangModelCmHandle> unsynchronizedReadyCmHandles =
moduleOperationsUtils.getUnsynchronizedReadyCmHandles();
import java.util.concurrent.BlockingQueue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.init.actuator.ReadinessManager;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
private final ModuleSyncTasks moduleSyncTasks;
@Qualifier("cpsAndNcmpLock")
private final IMap<String, String> cpsAndNcmpLock;
+ private final ReadinessManager readinessManager;
private static final int MODULE_SYNC_BATCH_SIZE = 300;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
+
/**
* Check DB for any cm handles in 'ADVISED' state.
* Queue and create batches to process them asynchronously.
* This method will only finish when there are no more 'ADVISED' cm handles in the DB.
- * This method is triggered on a configurable interval (ncmp.timers.advised-modules-sync.sleep-time-ms)
+ * This method is triggered on a configurable interval (ncmp.timers.advised-modules-sync.sleep-time-ms) and when the
+ * system is in the ready state.
+ */
+ @Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
+ public void scheduledModuleSyncAdvisedCmHandles() {
+ if (!readinessManager.isReady()) {
+ log.info("System is not ready yet");
+ return;
+ }
+ moduleSyncAdvisedCmHandles();
+ }
+
+ /**
+ * This method is used when we dont want the scheduled behaviour.
+ * Mainly used in the integration testware.
*/
- @Scheduled(initialDelayString = "${ncmp.timers.advised-modules-sync.initial-delay-ms:40000}",
- fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
public void moduleSyncAdvisedCmHandles() {
log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
package org.onap.cps.ncmp.impl.inventory.sync
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
import com.hazelcast.map.IMap
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsModuleService
+import org.onap.cps.init.actuator.ReadinessManager
import org.onap.cps.ncmp.api.inventory.models.CompositeState
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.inventory.models.CmHandleState
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.slf4j.LoggerFactory
import spock.lang.Specification
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
def mockCpsDataService = Mock(CpsDataService)
def mockModuleOperationUtils = Mock(ModuleOperationsUtils)
def mockDataSyncSemaphores = Mock(IMap<String,Boolean>)
+ def mockReadinessManager = Mock(ReadinessManager)
def jsonString = '{"stores:bookstore":{"categories":[{"code":"01"}]}}'
- def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsModuleService, mockCpsDataService, mockModuleOperationUtils, mockDataSyncSemaphores)
+ def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsModuleService, mockCpsDataService, mockModuleOperationUtils, mockDataSyncSemaphores, mockReadinessManager)
def compositeState = getCompositeState()
def yangModelCmHandle1 = createSampleYangModelCmHandle('cm-handle-1')
def yangModelCmHandle2 = createSampleYangModelCmHandle('cm-handle-2')
+ def logAppender = Spy(ListAppender<ILoggingEvent>)
+
+ void setup() {
+ def logger = LoggerFactory.getLogger(DataSyncWatchdog)
+ logger.setLevel(Level.INFO)
+ logger.addAppender(logAppender)
+ logAppender.start()
+ }
+
+ void cleanup() {
+ ((Logger) LoggerFactory.getLogger(DataSyncWatchdog.class)).detachAndStopAllAppenders()
+ }
+
+ def 'Data sync watchdog is triggered'(){
+ given: 'the system is not ready to accept traffic'
+ mockReadinessManager.isReady() >> false
+ when: 'data sync is started'
+ objectUnderTest.scheduledUnsynchronizedReadyCmHandleForInitialDataSync()
+ then: 'an event is logged with level INFO'
+ def loggingEvent = getLoggingEvent()
+ assert loggingEvent.level == Level.INFO
+ and: 'the log indicates that the system is not ready yet'
+ assert loggingEvent.formattedMessage == 'System is not ready yet'
+ }
+
def 'Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED.'() {
given: 'sample resource data'
def resourceData = jsonString
+ and: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
and: 'sync utilities returns a cm handle twice'
mockModuleOperationUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
and: 'we have the module and root nodes references to form the options field'
mockCpsModuleService.getRootNodeReferences(_, 'cm-handle-1') >> ['some-module-1:some-root-node']
mockCpsModuleService.getRootNodeReferences(_, 'cm-handle-2') >> ['some-module-2:some-root-node']
when: 'data sync poll is executed'
- objectUnderTest.executeUnsynchronizedReadyCmHandleForInitialDataSync()
+ objectUnderTest.scheduledUnsynchronizedReadyCmHandleForInitialDataSync()
then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
1 * mockInventoryPersistence.getCmHandleState('cm-handle-1') >> compositeState
and: 'the sync util returns first resource data'
}
def 'Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED without resource data.'() {
- given: 'sync utilities returns a cm handle'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'sync utilities returns a cm handle'
mockModuleOperationUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1]
and: 'the module service returns the module and root nodes references to form the options field'
mockCpsModuleService.getRootNodeReferences(_,'cm-handle-1') >> ['some-module-1:some-root-node']
when: 'data sync poll is executed'
- objectUnderTest.executeUnsynchronizedReadyCmHandleForInitialDataSync()
+ objectUnderTest.scheduledUnsynchronizedReadyCmHandleForInitialDataSync()
then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
1 * mockInventoryPersistence.getCmHandleState('cm-handle-1') >> compositeState
and: 'the sync util returns no resource data'
}
def 'Data Sync for Cm Handle that is already being processed.'() {
- given: 'sync utilities returns a cm handle'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'sync utilities returns a cm handle'
mockModuleOperationUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1]
and: 'the module service returns the module and root nodes references to form the options field'
mockCpsModuleService.getRootNodeReferences(_,'cm-handle-1') >> ['some-module-1:some-root-node']
and: 'the shared data sync semaphore indicate it is already being processed'
mockDataSyncSemaphores.putIfAbsent('cm-handle-1', _, _, _) >> 'something (not null)'
when: 'data sync poll is executed'
- objectUnderTest.executeUnsynchronizedReadyCmHandleForInitialDataSync()
+ objectUnderTest.scheduledUnsynchronizedReadyCmHandleForInitialDataSync()
then: 'it is NOT processed e.g. state is not requested'
0 * mockInventoryPersistence.getCmHandleState(*_)
}
def 'Data sync handles exception during overall cm handle processing.'() {
- given: 'sync utilities returns a cm handle'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'sync utilities returns a cm handle'
mockModuleOperationUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1]
and: 'semaphore map allows processing'
mockDataSyncSemaphores.putIfAbsent('cm-handle-1', false, _, _) >> null
and: 'getting cm handle state throws exception'
mockInventoryPersistence.getCmHandleState('cm-handle-1') >> { throw new RuntimeException('some exception') }
when: 'data sync poll is executed'
- objectUnderTest.executeUnsynchronizedReadyCmHandleForInitialDataSync()
+ objectUnderTest.scheduledUnsynchronizedReadyCmHandleForInitialDataSync()
then: 'no exception is thrown'
noExceptionThrown()
}
def 'Data sync handles exception during resource data retrieval.'() {
- given: 'sync utilities returns a cm handle'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'sync utilities returns a cm handle'
mockModuleOperationUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1]
and: 'semaphore map allows processing'
mockDataSyncSemaphores.putIfAbsent('cm-handle-1', false, _, _) >> null
and: 'module operations returns module and root nodes references'
mockCpsModuleService.getRootNodeReferences(_,'cm-handle-1') >> ['some-module-1:some-root-node', 'some-module-2:some-root-node']
when: 'data sync poll is executed'
- objectUnderTest.executeUnsynchronizedReadyCmHandleForInitialDataSync()
+ objectUnderTest.scheduledUnsynchronizedReadyCmHandleForInitialDataSync()
then: 'cm handle state is retrieved'
1 * mockInventoryPersistence.getCmHandleState('cm-handle-1') >> compositeState
and: 'first module sync succeeds'
.build()).build())
return compositeState
}
+
+ def getLoggingEvent() {
+ return logAppender.list[0]
+ }
}
package org.onap.cps.ncmp.impl.inventory.sync
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
import com.hazelcast.map.IMap
+import org.onap.cps.init.actuator.ReadinessManager
+import org.slf4j.LoggerFactory
+
import java.util.concurrent.ArrayBlockingQueue
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import spock.lang.Specification
def mockCpsAndNcmpLock = Mock(IMap<String,String>)
- def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock)
+ def mockReadinessManager = Mock(ReadinessManager)
+
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, mockCpsAndNcmpLock, mockReadinessManager)
+
+ def logAppender = Spy(ListAppender<ILoggingEvent>)
+
+ void setup() {
+ def logger = LoggerFactory.getLogger(ModuleSyncWatchdog)
+ logger.setLevel(Level.INFO)
+ logger.addAppender(logAppender)
+ logAppender.start()
+ }
+
+ void cleanup() {
+ ((Logger) LoggerFactory.getLogger(ModuleSyncWatchdog.class)).detachAndStopAllAppenders()
+ }
+
+ def 'Module sync watchdog is triggered'(){
+ given: 'the system is not ready to accept traffic'
+ mockReadinessManager.isReady() >> false
+ when: 'module sync is started'
+ objectUnderTest.scheduledModuleSyncAdvisedCmHandles()
+ then: 'an event is logged with level INFO'
+ def loggingEvent = getLoggingEvent()
+ assert loggingEvent.level == Level.INFO
+ and: 'the log indicates that the system is not ready yet'
+ assert loggingEvent.formattedMessage == 'System is not ready yet'
+ }
def 'Module sync advised cm handles with #scenario.'() {
- given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(numberOfAdvisedCmHandles)
and: 'module sync utilities returns no failed (locked) cm handles'
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
}
def 'Module sync cm handles starts with no available threads.'() {
- given: 'module sync utilities returns a advise cm handles'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'module sync utilities returns a advise cm handles'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
}
def 'Module sync advised cm handle already handled by other thread.'() {
- given: 'module sync utilities returns an advised cm handle'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'the work queue can be locked'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> true
}
def 'Module sync with previous cm handle(s) left in work queue.'() {
- given: 'there is still a cm handle in the queue'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'there is still a cm handle in the queue'
moduleSyncWorkQueue.offer('ch-1')
when: 'module sync is started'
objectUnderTest.moduleSyncAdvisedCmHandles()
}
def 'Reset failed cm handles.'() {
- given: 'module sync utilities returns failed cm handles'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'module sync utilities returns failed cm handles'
def failedCmHandles = [new YangModelCmHandle()]
mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
when: 'reset failed cm handles is started'
}
def 'Module Sync Locking.'() {
- given: 'module sync utilities returns an advised cm handle'
+ given: 'system is ready to accept traffic'
+ mockReadinessManager.isReady() >> true
+ and: 'module sync utilities returns an advised cm handle'
mockModuleOperationsUtils.getAdvisedCmHandleIds() >> createCmHandleIds(1)
and: 'can be locked is : #canLock'
mockCpsAndNcmpLock.tryLock('workQueueLock') >> canLock
def createCmHandleIds(numberOfCmHandles) {
return (numberOfCmHandles > 0) ? (1..numberOfCmHandles).collect { 'ch-'+it } : []
}
+
+ def getLoggingEvent() {
+ return logAppender.list[0]
+ }
}
import org.onap.cps.api.model.Dataspace
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
+import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
applicationContext.close()
}
- def 'Onboard subscription model via application started event.'() {
+ def 'Onboard subscription model via application ready event.'() {
given: 'dataspace is ready for use'
mockCpsDataspaceService.getDataspace(NCMP_DATASPACE_NAME) >> new Dataspace('')
- when: 'the application is started'
- objectUnderTest.onApplicationEvent(Mock(ApplicationStartedEvent))
+ when: 'the application is ready'
+ objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
then: 'the module service to create schema set is called once'
1 * mockCpsModuleService.createSchemaSet(NCMP_DATASPACE_NAME, 'cm-data-job-subscriptions', expectedYangResourcesToContentMap)
and: 'the admin service to create an anchor set is called once'
import org.onap.cps.api.model.ModuleDefinition
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
+import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.AnnotationConfigApplicationContext
mockCpsAdminService.getDataspace(NCMP_DATASPACE_NAME) >> new Dataspace('')
and: 'module revision does not exist'
mockCpsModuleService.getModuleDefinitionsByAnchorAndModule(_, _, _, _) >> Collections.emptyList()
- when: 'the application is started'
- objectUnderTest.onApplicationEvent(Mock(ApplicationStartedEvent))
+ when: 'the application is ready'
+ objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
then: 'the module service is used to create the new schema set from the correct resource'
1 * mockCpsModuleService.createSchemaSet(NCMP_DATASPACE_NAME, 'dmi-registry-2024-02-23', expectedPreviousYangResourceToContentMap)
and: 'No schema sets are being removed by the module service (yet)'
import org.onap.cps.init.actuator.ReadinessManager;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
@Slf4j
@RequiredArgsConstructor
private static final int EXIT_CODE_ON_ERROR = 1;
@Override
- public void onApplicationEvent(final ApplicationStartedEvent applicationStartedEvent) {
- final String modelLoaderName = this.getClass().getSimpleName();
- readinessManager.registerStartupProcess(modelLoaderName);
+ public void onApplicationEvent(final ApplicationReadyEvent applicationReadyEvent) {
try {
onboardOrUpgradeModel();
} catch (final Exception exception) {
log.error("Exiting application due to failure in onboarding model: {} ",
exception.getMessage());
- exitApplication(applicationStartedEvent);
+ exitApplication(applicationReadyEvent);
} finally {
- readinessManager.markStartupProcessComplete(modelLoaderName);
+ readinessManager.markStartupProcessComplete(getName());
}
}
}
}
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* Checks if the specified revision of a module is installed.
*/
return !moduleDefinitions.isEmpty();
}
+
Map<String, String> mapYangResourcesToContent(final String... resourceNames) {
final Map<String, String> yangResourceContentByName = new HashMap<>();
for (final String resourceName: resourceNames) {
}
}
- private void exitApplication(final ApplicationStartedEvent applicationStartedEvent) {
- SpringApplication.exit(applicationStartedEvent.getApplicationContext(), () -> EXIT_CODE_ON_ERROR);
+ private void exitApplication(final ApplicationReadyEvent applicationReadyEvent) {
+ SpringApplication.exit(applicationReadyEvent.getApplicationContext(), () -> EXIT_CODE_ON_ERROR);
}
}
package org.onap.cps.init;
-import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
-public interface ModelLoader extends ApplicationListener<ApplicationStartedEvent> {
+public interface ModelLoader extends ApplicationListener<ApplicationReadyEvent> {
@Override
- void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent);
+ void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent);
void onboardOrUpgradeModel();
+
+ String getName();
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init.actuator;
+
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.init.ModelLoader;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class ModelLoaderRegistrationOnStartup implements ApplicationListener<ApplicationStartedEvent> {
+
+ private final ReadinessManager readinessManager;
+ // Spring will insert all concrete model loader classes here.
+ private final List<ModelLoader> modelLoaders;
+
+
+ /**
+ * Register the model loaders as part of the Application Started Phase.
+ *
+ * @param applicationStartedEvent Application Started Event
+ */
+ @Override
+ public void onApplicationEvent(final ApplicationStartedEvent applicationStartedEvent) {
+
+ modelLoaders.forEach(modelLoader -> {
+ log.info("Registering ModelLoader {}", modelLoader.getName());
+ readinessManager.registerStartupProcess(modelLoader.getName());
+ });
+ }
+}
import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
-@Component
+@Configuration
@RequiredArgsConstructor
-public class ReadinessHealthIndicator implements HealthIndicator {
+public class ReadinessStateHealthIndicatorConfig {
private final ReadinessManager readinessManager;
- @Override
- public Health health() {
- if (readinessManager.isReady()) {
- return Health.up()
- .withDetail("Startup Processes", "All startup processes completed")
- .build();
- }
- return Health.down()
- .withDetail("Startup Processes active", readinessManager.getStartupProcessesAsString())
- .build();
+ /**
+ * Overriding the default Readiness State Health Indicator.
+ *
+ * @return Health Status ( UP or DOWN )
+ */
+ @Bean("readinessStateHealthIndicator")
+ public HealthIndicator readinessStateHealthIndicator() {
+
+ return () -> {
+ if (readinessManager.isReady()) {
+ return Health.up().withDetail("Startup Processes", "All startup processes completed").build();
+ }
+ return Health.down().withDetail("Startup Processes active", readinessManager.getStartupProcessesAsString())
+ .build();
+ };
}
}
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
import org.springframework.boot.SpringApplication
+import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
loggingListAppender.stop()
}
- def 'Application started event triggers onboarding/upgrade'() {
- when: 'Application (started) event is triggered'
- objectUnderTest.onApplicationEvent(Mock(ApplicationStartedEvent))
+ def 'Application ready event triggers onboarding/upgrade'() {
+ when: 'Application (ready) event is triggered'
+ objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
then: 'the onboard/upgrade method is executed'
1 * objectUnderTest.onboardOrUpgradeModel()
}
- def 'Application started event handles startup exception'() {
+ def 'Application ready event handles startup exception'() {
given: 'a startup exception is thrown during model onboarding'
objectUnderTest.onboardOrUpgradeModel() >> { throw new ModelOnboardingException('test message','details are not logged') }
- when: 'Application (started) event is triggered'
- objectUnderTest.onApplicationEvent(new ApplicationStartedEvent(new SpringApplication(), null, applicationContext, null))
+ when: 'Application (ready) event is triggered'
+ objectUnderTest.onApplicationEvent(new ApplicationReadyEvent(new SpringApplication(), null, applicationContext, null))
then: 'the exception message is logged'
def logs = loggingListAppender.list.toString()
assert logs.contains('test message')
@Override
void onboardOrUpgradeModel() {
- // No operation needed for testing
+ // Not needed for testing
+ }
+
+ @Override
+ String getName() {
+ // Not needed for testing
}
}
}
import org.onap.cps.api.model.Dataspace
import org.onap.cps.init.actuator.ReadinessManager
import org.slf4j.LoggerFactory
+import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import spock.lang.Specification
loggingListAppender.stop()
}
- def 'Onboard subscription model via application started event.'() {
+ def 'Onboard subscription model via application ready event.'() {
given: 'dataspace is already present'
mockCpsDataspaceService.getAllDataspaces() >> [new Dataspace('test')]
when: 'the application is ready'
- objectUnderTest.onApplicationEvent(Mock(ApplicationStartedEvent))
+ objectUnderTest.onApplicationEvent(Mock(ApplicationReadyEvent))
then: 'the module service to create schema set is called once'
1 * mockCpsModuleService.createSchemaSet(CPS_DATASPACE_NAME, SCHEMASET_NAME, expectedYangResourcesToContents)
and: 'the anchor service to create an anchor set is called once'
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.init.actuator
+
+import org.onap.cps.init.ModelLoader
+import org.springframework.boot.context.event.ApplicationStartedEvent
+import spock.lang.Specification
+
+class ModelLoaderRegistrationOnStartupSpec extends Specification {
+
+ def mockReadinessManager = Mock(ReadinessManager)
+ def mockModelLoader1 = Mock(ModelLoader)
+ def mockModelLoader2 = Mock(ModelLoader)
+ def mockApplicationStartedEvent = Mock(ApplicationStartedEvent)
+
+ def objectUnderTest = new ModelLoaderRegistrationOnStartup(mockReadinessManager, [mockModelLoader1, mockModelLoader2])
+
+ def 'Register the model loaders via application started event'() {
+ given: 'model loaders with specific name'
+ mockModelLoader1.name >> 'my-loader-1'
+ mockModelLoader2.name >> 'my-loader-2'
+ when: ' application started event is fired'
+ objectUnderTest.onApplicationEvent(mockApplicationStartedEvent)
+ then: 'loaders are registered with the readiness managers'
+ 1 * mockReadinessManager.registerStartupProcess('my-loader-1')
+ 1 * mockReadinessManager.registerStartupProcess('my-loader-2')
+ }
+
+
+}
import spock.lang.Specification;
-class ReadinessHealthIndicatorSpec extends Specification {
+class ReadinessStateHealthIndicatorConfigSpec extends Specification {
def readinessManager = new ReadinessManager()
- def objectUnderTest = new ReadinessHealthIndicator(readinessManager)
+ def objectUnderTest = new ReadinessStateHealthIndicatorConfig(readinessManager)
def 'CPS service UP when all loaders are completed'() {
given: 'no loaders are in progress'
when: 'cps health check is invoked'
- def cpsHealth = objectUnderTest.health()
+ def cpsHealth = objectUnderTest.readinessStateHealthIndicator().getHealth(true)
then: 'health status is UP with following message'
assert cpsHealth.status.code == 'UP'
assert cpsHealth.details['Startup Processes'] == 'All startup processes completed'
given: 'any module loader is still running'
readinessManager.registerStartupProcess('someLoader')
when: 'cps health check is invoked'
- def cpsHealth = objectUnderTest.health()
+ def cpsHealth = objectUnderTest.readinessStateHealthIndicator().getHealth(true)
then: 'cps service is DOWN with loaders listed'
assert cpsHealth.status.code == 'DOWN'
def busyLoaders = cpsHealth.details['Startup Processes active']
readinessManager.registerStartupProcess('someLoader')
when: 'module loader completes'
readinessManager.markStartupProcessComplete('someLoader')
- def health = objectUnderTest.health()
+ def health = objectUnderTest.readinessStateHealthIndicator().getHealth(true)
then: 'cps health status flips to UP'
assert health.status.code == 'UP'
assert health.details['Startup Processes'] == 'All startup processes completed'
import org.onap.cps.api.CpsQueryService
import org.onap.cps.api.exceptions.DataspaceNotFoundException
import org.onap.cps.api.model.DataNode
+import org.onap.cps.init.actuator.ModelLoaderRegistrationOnStartup
+import org.onap.cps.init.actuator.ReadinessManager
import org.onap.cps.integration.DatabaseTestContainer
import org.onap.cps.integration.KafkaTestContainer
import org.onap.cps.ncmp.api.inventory.models.CmHandleState
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.annotation.ComponentScan
import org.springframework.data.jpa.repository.config.EnableJpaRepositories
-import org.springframework.test.context.ActiveProfiles
import org.springframework.test.web.servlet.MockMvc
import org.testcontainers.spock.Testcontainers
import spock.lang.Shared
@EnableJpaRepositories(basePackageClasses = [DataspaceRepository])
@ComponentScan(basePackages = ['org.onap.cps'])
@EntityScan('org.onap.cps.ri.models')
-@ActiveProfiles('module-sync-delayed')
abstract class CpsIntegrationSpecBase extends Specification {
static KafkaConsumer kafkaConsumer
@Autowired
AlternateIdMatcher alternateIdMatcher
+ @Autowired
+ ModelLoaderRegistrationOnStartup modelLoaderRegistrationOnStartup
+
+ @Autowired
+ ReadinessManager readinessManager
+
@Value('${ncmp.policy-executor.server.port:8080}')
private String policyServerPort;
mockPolicyServer.setDispatcher(policyDispatcher)
mockPolicyServer.start(Integer.valueOf(policyServerPort))
- DMI1_URL = String.format("http://%s:%s", mockDmiServer1.getHostName(), mockDmiServer1.getPort())
- DMI2_URL = String.format("http://%s:%s", mockDmiServer2.getHostName(), mockDmiServer2.getPort())
+ DMI1_URL = String.format('http://%s:%s', mockDmiServer1.getHostName(), mockDmiServer1.getPort())
+ DMI2_URL = String.format('http://%s:%s', mockDmiServer2.getHostName(), mockDmiServer2.getPort())
+
+ readinessManager.registerStartupProcess('Dummy process to prevent watchdogs processes starting during integration tests')
}
def cleanup() {
+++ /dev/null
-
-# ============LICENSE_START=======================================================
-# Copyright (C) 2024 Nordix Foundation.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# SPDX-License-Identifier: Apache-2.0
-# ============LICENSE_END=========================================================
-test:
- ncmp:
- timers:
- advised-modules-sync:
- initial-delay-ms: 600000
-
timers:
advised-modules-sync:
- initial-delay-ms: 0
sleep-time-ms: 1000000
cm-handle-data-sync:
sleep-time-ms: 30000