topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}\r
events:\r
topic: ${NCMP_EVENTS_TOPIC:ncmp-events}\r
+\r
notification:\r
+ enabled: true\r
data-updated:\r
- enabled: false\r
topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events}\r
filters:\r
enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}\r
async:\r
- enabled: false\r
executor:\r
core-pool-size: 2\r
max-pool-size: 10\r
import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.PATCH;
import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.UPDATE;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
private final NcmpRestInputMapper ncmpRestInputMapper;
private final CmHandleStateMapper cmHandleStateMapper;
private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
-
@Value("${notification.async.executor.time-out-value-in-ms:2000}")
private int timeOutInMilliSeconds;
+ @Value("${notification.enabled:true}")
+ private boolean asyncEnabled;
/**
* Get resource data from operational datastore.
final @NotNull @Valid String resourceIdentifier,
final @Valid String optionsParamInQuery,
final @Valid String topicParamInQuery) {
- if (isValidTopic(topicParamInQuery)) {
+ if (asyncEnabled && isValidTopic(topicParamInQuery)) {
final String requestId = UUID.randomUUID().toString();
+ log.info("Received Async passthrough-operational request with id {}", requestId);
cpsNcmpTaskExecutor.executeTask(() ->
- networkCmProxyDataService.getResourceDataOperationalForCmHandle(
- cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
- requestId
- ), timeOutInMilliSeconds
+ networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+ cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId
+ ), timeOutInMilliSeconds
);
- return acknowledgeAsyncRequest(requestId);
+ return ResponseEntity.ok(Map.of("requestId", requestId));
+ } else {
+ log.warn("Asynchronous messaging is currently disabled for passthrough-operational."
+ + " Will use synchronous operation.");
}
final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(
final @NotNull @Valid String resourceIdentifier,
final @Valid String optionsParamInQuery,
final @Valid String topicParamInQuery) {
- if (isValidTopic(topicParamInQuery)) {
- final String resourceDataRequestId = UUID.randomUUID().toString();
+ if (asyncEnabled && isValidTopic(topicParamInQuery)) {
+ final String requestId = UUID.randomUUID().toString();
+ log.info("Received Async passthrough-running request with id {}", requestId);
cpsNcmpTaskExecutor.executeTask(() ->
networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
- cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
- resourceDataRequestId
+ cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId
), timeOutInMilliSeconds
);
- return acknowledgeAsyncRequest(resourceDataRequestId);
+ return ResponseEntity.ok(Map.of("requestId", requestId));
+ } else {
+ log.warn("Asynchronous messaging is currently disabled for passthrough-running."
+ + " Will use synchronous operation.");
}
final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic");
}
- private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) {
- final Map<String, Object> acknowledgeData = new HashMap<>(1);
- acknowledgeData.put("requestId", requestId);
- return ResponseEntity.ok(acknowledgeData);
- }
-
}
# ============LICENSE_START=======================================================
-# Copyright (C) 2021 Nordix Foundation
+# Copyright (C) 2021-2022 Nordix Foundation
# Modifications Copyright (C) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
ncmp-inventory-base-path: /ncmpInventory
notification:
+ enabled: true
async:
executor:
time-out-value-in-ms: 2000
\ No newline at end of file
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class NcmpAsyncRequestResponseEventConsumer {
private final NcmpAsyncRequestResponseEventProducer ncmpAsyncRequestResponseEventProducer;
@Value("${app.ncmp.events.topic:ncmp-events}")
private String topicName;
+ @Value("${notification.enabled:true}")
+ private boolean notificationsEnabled;
+
/**
* Publish the NcmpEvent to the public topic.
*
* @param cmHandleId Cm Handle Id
*/
public void publishNcmpEvent(final String cmHandleId) {
-
- final NcmpServiceCmHandle ncmpServiceCmHandle = YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(
- inventoryPersistence.getYangModelCmHandle(cmHandleId));
- final NcmpEvent ncmpEvent = ncmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmHandle);
- ncmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent);
-
+ if (notificationsEnabled) {
+ final NcmpServiceCmHandle ncmpServiceCmHandle =
+ YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(
+ inventoryPersistence.getYangModelCmHandle(cmHandleId));
+ final NcmpEvent ncmpEvent = ncmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmHandle);
+ ncmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent);
+ } else {
+ log.debug("Notifications disabled.");
+ }
}
}
* @return CompositeState
*/
public CompositeStateBuilder fromDataNode(final DataNode dataNode) {
- this.cmHandleState = CmHandleState.valueOf((String) dataNode.getLeaves()
- .get("cm-handle-state"));
+ this.cmHandleState = CmHandleState.valueOf((String) dataNode.getLeaves()
+ .get("cm-handle-state"));
+ this.lastUpdatedTime = (String) dataNode.getLeaves().get("last-update-time");
for (final DataNode stateChildNode : dataNode.getChildDataNodes()) {
if (stateChildNode.getXpath().endsWith("/lock-reason")) {
this.lockReason = getLockReason(stateChildNode);
* Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'.
*/
@Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
- public void executeLockedMisbehavingCmHandlePoll() {
+ public void executeLockedCmHandlePoll() {
final List<YangModelCmHandle> lockedMisbehavingCmHandles = syncUtils.getLockedMisbehavingYangModelCmHandles();
- for (final YangModelCmHandle lockedMisbehavingModelCmHandle : lockedMisbehavingCmHandles) {
- final CompositeState compositeState = lockedMisbehavingModelCmHandle.getCompositeState();
- setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
- log.debug("Locked misbehaving cm handle {} is being recycled", lockedMisbehavingModelCmHandle.getId());
- inventoryPersistence.saveCmHandleState(lockedMisbehavingModelCmHandle.getId(), compositeState);
+ for (final YangModelCmHandle moduleSyncFailedCmHandle : lockedMisbehavingCmHandles) {
+ final CompositeState compositeState = moduleSyncFailedCmHandle.getCompositeState();
+ final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
+ if (isReadyForRetry) {
+ setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
+ log.debug("Locked misbehaving cm handle {} is being recycled", moduleSyncFailedCmHandle.getId());
+ inventoryPersistence.saveCmHandleState(moduleSyncFailedCmHandle.getId(), compositeState);
+ }
}
}
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import java.security.SecureRandom;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
.lockReasonCategory(lockReasonCategory).build());
}
+
+ /**
+ * Check if the retry mechanism should attempt to unlock the cm handle based on the last update time.
+ *
+ * @param compositeState the composite state currently in the locked state
+ * @return if the retry mechanism should be attempted
+ */
+ public boolean isReadyForRetry(final CompositeState compositeState) {
+ int timeUntilNextAttempt = 1;
+ final OffsetDateTime time =
+ OffsetDateTime.parse(compositeState.getLastUpdateTime(),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+ final Matcher matcher = retryAttemptPattern.matcher(compositeState.getLockReason().getDetails());
+ if (matcher.find()) {
+ timeUntilNextAttempt = (int) Math.pow(2, Integer.parseInt(matcher.group(1)));
+ } else {
+ log.debug("First Attempt: no current attempts found.");
+ }
+ final int timeSinceLastAttempt = (int) Duration.between(time, OffsetDateTime.now()).toMinutes();
+ return timeSinceLastAttempt > timeUntilNextAttempt;
+ }
+
/**
* Get the Resourece Data from Node through DMI Passthrough service.
*
def mockInventoryPersistence = Mock(InventoryPersistence)
def mockNcmpEventsPublisher = Mock(NcmpEventsPublisher)
- def mockNcmpEventsMapper = Mock(NcmpEventsCreator)
+ def mockNcmpEventsCreator = Mock(NcmpEventsCreator)
- def objectUnderTest = new NcmpEventsService(mockInventoryPersistence, mockNcmpEventsPublisher, mockNcmpEventsMapper)
+ def objectUnderTest = new NcmpEventsService(mockInventoryPersistence, mockNcmpEventsPublisher, mockNcmpEventsCreator)
- def 'Create and Publish event for #operation'() {
+ def 'Create and Publish ncmp event where events are #scenario'() {
given: 'a cm handle id and operation and responses are mocked'
mockResponses('test-cm-handle-id', 'test-topic')
+ and: 'notifications enabled is #notificationsEnabled'
+ objectUnderTest.notificationsEnabled = notificationsEnabled
when: 'service is called to publish ncmp event'
objectUnderTest.publishNcmpEvent('test-cm-handle-id')
- then: 'no exception is thrown'
- noExceptionThrown()
+ then: 'creator is called #expectedTimesMethodCalled times'
+ expectedTimesMethodCalled * mockNcmpEventsCreator.populateNcmpEvent('test-cm-handle-id', _)
+ and: 'publisher is called #expectedTimesMethodCalled times'
+ expectedTimesMethodCalled * mockNcmpEventsPublisher.publishEvent(*_)
+ where: 'the following values are used'
+ scenario | notificationsEnabled|| expectedTimesMethodCalled
+ 'enabled' | true || 1
+ 'disabled' | false || 0
}
def mockResponses(cmHandleId, topicName) {
def ncmpServiceCmhandle = YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(yangModelCmHandle)
mockInventoryPersistence.getYangModelCmHandle(cmHandleId) >> yangModelCmHandle
- mockNcmpEventsMapper.populateNcmpEvent(cmHandleId, ncmpServiceCmhandle) >> ncmpEvent
+ mockNcmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmhandle) >> ncmpEvent
mockNcmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent) >> {}
}
-
}
}
- def 'Schedule a Cm-Handle Sync for LOCKED with reason LOCKED_MISBEHAVING Cm-Handles '() {
+ def 'Schedule a Cm-Handle Sync for LOCKED with reason LOCKED_MISBEHAVING Cm-Handles with #scenario'() {
given: 'cm handles in an locked state'
def compositeState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED)
- .withLockReason(LockReasonCategory.LOCKED_MISBEHAVING, '').build()
+ .withLockReason(LockReasonCategory.LOCKED_MISBEHAVING, '').withLastUpdatedTimeNow().build()
def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
and: 'sync utilities return a cm handle twice'
mockSyncUtils.getLockedMisbehavingYangModelCmHandles() >> [yangModelCmHandle, yangModelCmHandle]
+ and: 'inventory persistence returns the composite state of the cm handle'
+ mockInventoryPersistence.getCmHandleState(yangModelCmHandle.getId()) >> compositeState
+ and: 'sync utils retry locked cm handle returns #isReadyForRetry'
+ mockSyncUtils.isReadyForRetry(compositeState) >>> isReadyForRetry
when: 'module sync poll is executed'
- objectUnderTest.executeLockedMisbehavingCmHandlePoll()
+ objectUnderTest.executeLockedCmHandlePoll()
then: 'the first cm handle is updated to state "ADVISED" from "READY"'
- 2 * mockInventoryPersistence.saveCmHandleState(yangModelCmHandle.id, compositeState)
+ expectedNumberOfInvocationsToSaveCmHandleState * mockInventoryPersistence.saveCmHandleState(yangModelCmHandle.id, compositeState)
+ where:
+ scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
+ 'retry locked cm handle once' | [true, false] || 1
+ 'retry locked cm handle twice' | [true, true] || 2
+ 'do not retry locked cm handle' | [false, false] || 0
}
}
import org.onap.cps.ncmp.api.impl.operations.DmiOperations
import org.onap.cps.ncmp.api.inventory.CmHandleState
import org.onap.cps.ncmp.api.inventory.CompositeState
+import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.inventory.LockReasonCategory
import org.onap.cps.ncmp.api.inventory.SyncState
import spock.lang.Shared
import spock.lang.Specification
+import java.time.OffsetDateTime
+import java.time.format.DateTimeFormatter
+
class SyncUtilsSpec extends Specification{
def mockInventoryPersistence = Mock(InventoryPersistence)
def objectUnderTest = new SyncUtils(mockInventoryPersistence, mockDmiDataOperations, jsonObjectMapper)
+ @Shared
+ def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(OffsetDateTime.now())
+
@Shared
def dataNode = new DataNode(leaves: ['id': 'cm-handle-123'])
result[0].id == 'cm-handle-123'
}
+ def 'Retry Locked Cm-Handle where the last update time is #scenario'() {
+ when: 'retry locked cm handle is invoked'
+ def result = objectUnderTest.isReadyForRetry(new CompositeStateBuilder()
+ .withLockReason(LockReasonCategory.LOCKED_MISBEHAVING, details)
+ .withLastUpdatedTime(lastUpdateTime).build())
+ then: 'result returns #expectedResult'
+ result == expectedResult
+ where:
+ scenario | lastUpdateTime | details || expectedResult
+ 'is the first attempt' | '1900-01-01T00:00:00.000+0100' | 'First Attempt' || true
+ 'is greater than one minute' | '1900-01-01T00:00:00.000+0100' | 'Attempt #1 failed:' || true
+ 'is less than eight minutes' | formattedDateAndTime | 'Attempt #3 failed:' || false
+ }
+
+
def 'Get a Cm-Handle where Operational Sync state is UnSynchronized and Cm-handle state is READY and #scenario'() {
given: 'the inventory persistence service returns a collection of data nodes'
mockInventoryPersistence.getCmHandlesByOperationalSyncState(SyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
/*
* ============LICENSE_START=======================================================
* Copyright (c) 2021 Bell Canada.
+ * Modifications Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@EnableAsync
@Configuration
-@ConditionalOnProperty(name = "notification.async.enabled", havingValue = "true", matchIfMissing = false)
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
@ConfigurationProperties("notification.async.executor")
@Validated
@Setter
import java.util.Map;
import javax.validation.constraints.NotNull;
import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
@NotNull
private String topic;
private Map<String, String> filters = Collections.emptyMap();
- @NotNull
- private boolean enabled = false;
+
+ @Value("${notification.enabled:true}")
+ private boolean enabled;
}
# ============LICENSE_END=========================================================
notification:
+ enabled: true
data-updated:
filters:
enabled-dataspaces: ".*-published,.*-important"
- enabled: true
topic: cps-event
async:
- enabled: true
executor:
core-pool-size: 2
max-pool-size: 10
cp $WORKSPACE/../docker-compose/*.yml $WORKSPACE/archives/dc-cps
cd $WORKSPACE/archives/dc-cps
-# download docker-compose of a required version (1.25.0 supports configuration of version 3.7)
-curl -L https://github.com/docker/compose/releases/download/1.25.0/docker-compose-`uname -s`-`uname -m` > docker-compose
+curl -L https://github.com/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` > docker-compose
chmod +x docker-compose
+docker-compose version
# start CPS/NCMP, DMI, and PostgreSQL containers with docker compose
-./docker-compose up -d
+docker-compose up -d
###################### setup sdnc #######################################
source $WORKSPACE/plans/cps/sdnc/sdnc_setup.sh
# limitations under the License.
# ============LICENSE_END=========================================================
-version: "3.7"
+version: '3.3'
services:
- ### Services cps-service, cps-ncmp, zookeeper and kafka are commented below, these
- ### services can be un-commented and used on need to use basis. Only minimal
- ### services will run (dbpostgresql, cps-and-ncmp and ncmp-dmi-plugin) by default.
- #cps-standalone:
- # container_name: cps-service
- # image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/cps-service:${VERSION:-latest}
- # ports:
- # - "8881:8080"
- # - "8887:8081"
- # environment:
- # CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser}
- # CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!}
- # DB_HOST: dbpostgresql
- # DB_USERNAME: ${DB_USERNAME:-cps}
- # DB_PASSWORD: ${DB_PASSWORD:-cps}
- # #KAFKA_BOOTSTRAP_SERVER: kafka:9092
- # #notification.data-updated.enabled: 'true'
- # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
- # restart: unless-stopped
- # depends_on:
- # - dbpostgresql
-
- #ncmp-standalone:
- # container_name: cps-ncmp
- # image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/cps-ncmp:${VERSION:-latest}
- # ports:
- # - "8882:8080"
- # - "8887:8081"
- # environment:
- # CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser}
- # CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!}
- # DB_HOST: dbpostgresql
- # DB_USERNAME: ${DB_USERNAME:-cps}
- # DB_PASSWORD: ${DB_PASSWORD:-cps}
- # DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
- # DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
- # #KAFKA_BOOTSTRAP_SERVER: kafka:9092
- # #notification.data-updated.enabled: 'true'
- # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
- # restart: unless-stopped
- # depends_on:
- # - dbpostgresql
-
- # zookeeper:
- # image: confluentinc/cp-zookeeper:6.2.1
- # environment:
- # ZOOKEEPER_CLIENT_PORT: 2181
- # ZOOKEEPER_TICK_TIME: 2000
- # ports:
- # - 22181:2181
- #
- # kafka:
- # image: confluentinc/cp-kafka:6.2.1
- # depends_on:
- # - zookeeper
- # ports:
- # - 29092:29092
- # environment:
- # KAFKA_BROKER_ID: 1
- # KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- # KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- # KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- # KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- # KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ ### docker-compose up -d -> run ALL services ###
+ ### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ###
dbpostgresql:
container_name: dbpostgresql
DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
KAFKA_BOOTSTRAP_SERVER: kafka:9092
- notification.data-updated.enabled: 'true'
+ notification.enabled: 'true'
+ notification.async.executor.time-out-value-in-ms: 2000
NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
TIMERS_ADVISED-MODULES-SYNC_SLEEP-TIME-MS: 2000
restart: unless-stopped
| config.additional. | Kafka topic to publish to cps-temporal | ``cps.data-updated-events`` |
| notification.data-updated.topic | | |
+---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
-| config.additional. | If notification from cps-core to cps-temporal is enabled or not. | ``true`` |
-| notification.data-updated.enabled | If this is set to false, then the config.publisher properties could be skipped. | |
-+---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
| config.additional. | Dataspaces to be enabled for publishing events to cps-temporal | ```` |
| notification.data-updated.filters. | | |
| enabled-dataspaces | | |
+---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
-| config.additional. | If notifications should be processed in synchronous or asynchronous manner | ``false`` |
-| notification.async.enabled | | |
+| config.additional. | If asynchronous messaging, user notifications, and updated event persistence should be enabled | ``true`` |
+| notification.enabled | | |
+---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
| config.additional. | Core pool size in asynchronous execution of notification. | ``2`` |
| notification.async.executor. | | |