From: emaclee Date: Tue, 4 Nov 2025 18:22:51 +0000 (+0000) Subject: Add Rejected scenario(s) for create subscription X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F88%2F142388%2F7;p=cps.git Add Rejected scenario(s) for create subscription - added consuming REJECTED create request from DMI - when REJECTED request is consumed, should update database and log details (dataNodeSelector, dataJobId, dmi details) - added ignoring when dataJobId is not unique with test Issue-ID: CPS-3005 Change-Id: If06c6ddfb3cf151ebf9485d43ba9d6ee5ef57287 Signed-off-by: emaclee --- diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java index 141c74a81e..d03fb4b0a3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java @@ -22,7 +22,6 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.dmi; import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED; -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED; import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; import io.cloudevents.CloudEvent; @@ -42,9 +41,10 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class EventConsumer { - private final CmSubscriptionHandler cmSubscriptionHandler; private static final String CORRELATION_ID_SEPARATOR = "#"; + private final CmSubscriptionHandler cmSubscriptionHandler; + /** * Consume the Cm Notification Subscription response event from the dmi-plugin. * @@ -69,10 +69,7 @@ public class EventConsumer { if ("subscriptionCreateResponse".equals(eventType)) { final CmSubscriptionStatus cmSubscriptionStatus = getCmSubscriptionStatus(dmiOutEvent); - if (ACCEPTED.equals(cmSubscriptionStatus)) { - cmSubscriptionHandler.updateCmSubscriptionStatus( - subscriptionId, dmiPluginName, cmSubscriptionStatus); - } + cmSubscriptionHandler.updateCmSubscriptionStatus(subscriptionId, dmiPluginName, cmSubscriptionStatus); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java index 092bff7b10..f4fc169c54 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java @@ -20,6 +20,8 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp; +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -42,7 +44,6 @@ import org.onap.cps.ncmp.impl.utils.JexParser; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; - @Service @RequiredArgsConstructor @Slf4j @@ -58,12 +59,14 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { @Override public void createSubscription(final DataSelector dataSelector, final String subscriptionId, final List dataNodeSelectors) { - for (final String dataNodeSelector : dataNodeSelectors) { - cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector); + if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) { + for (final String dataNodeSelector : dataNodeSelectors) { + cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector); + } + sendEventToDmis(subscriptionId, + cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId), + dataSelector, "subscriptionCreateRequest"); } - sendEventToDmis(subscriptionId, - cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId), - dataSelector, "subscriptionCreateRequest"); } @Override @@ -86,20 +89,37 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { final CmSubscriptionStatus cmSubscriptionStatus) { final List dataNodeSelectors = cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId); + final List rejectedDataNodeSelectors = new ArrayList<>(); for (final String dataNodeSelector : dataNodeSelectors) { final String cmHandleId = getCmHandleId(dataNodeSelector); if (cmHandleId == null) { - log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector); - } else { - final String resolvedDmiServiceName = getDmiServiceName(cmHandleId); - if (resolvedDmiServiceName.equals(dmiServiceName)) { - cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector, - cmSubscriptionStatus); + log.info("Ignoring dataNodeSelector={} because no matching CM Handle ID found", + dataNodeSelector); + continue; + } + final String resolvedDmiServiceName = getDmiServiceName(cmHandleId); + if (resolvedDmiServiceName.equals(dmiServiceName)) { + cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector, + cmSubscriptionStatus); + if (cmSubscriptionStatus.equals(REJECTED)) { + rejectedDataNodeSelectors.add(dataNodeSelector); } } + + } + if (!rejectedDataNodeSelectors.isEmpty()) { + logRejectedDataNodeSelectors(subscriptionId, dmiServiceName, rejectedDataNodeSelectors); } } + private static void logRejectedDataNodeSelectors(final String subscriptionId, final String dmiServiceName, + final List rejectedDataNodeSelectors) { + final String dataNodeSelectorAsString = + JexParser.toJsonExpressionsAsString(rejectedDataNodeSelectors); + log.info("DMI plugin {} rejected dataNodeSelector '{}' for dataJobId:{}", + dmiServiceName, dataNodeSelectorAsString, subscriptionId); + } + private void sendEventToDmis(final String subscriptionId, final List dataNodeSelectors, final DataSelector dataSelector, diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy index af0d322c84..75a9e5a5ae 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.dmi import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED import ch.qos.logback.classic.Level import ch.qos.logback.classic.Logger @@ -32,7 +33,6 @@ import io.cloudevents.core.builder.CloudEventBuilder import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler -import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.slf4j.LoggerFactory import spock.lang.Specification @@ -55,30 +55,34 @@ class EventConsumerSpec extends Specification { ((Logger) LoggerFactory.getLogger(EventConsumer.class)).detachAndStopAllAppenders() } - def 'Consume subscription CREATE response with status ACCEPTED from DMI Plugin'() { + def 'Consume subscription #scenario CREATE response from DMI Plugin'() { given: 'a response event from DMI' - def jsonData = TestUtils.getResourceFileContent( - 'datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(testEventSent)) - .withId('random-uuid') - .withType('subscriptionCreateResponse') - .withSource(URI.create('myDmi')) - .withExtension('correlationid', 'sub-1#myDmi').build() - def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', testCloudEventSent) + def responseEvent = createResponseEventFromDmi('sub-1#myDmi', 'myDmi', statusAsString) + def consumerRecord = new ConsumerRecord('topic-name', 0, 0, 'event-key', responseEvent) when: 'the event is consumed' objectUnderTest.consumeDmiOutEvent(consumerRecord) then: 'an event is logged with level INFO' - def loggingEvent = getLoggingEvent() + def loggingEvent = logger.list.first() assert loggingEvent.level == Level.INFO and: 'the log indicates the task completed successfully' assert loggingEvent.formattedMessage == 'Consumed DMI subscription response event with details: | correlationId=sub-1#myDmi | eventType=subscriptionCreateResponse' and: 'the subscription handler is called to update status of subscription with correct details' - 1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', ACCEPTED) + 1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', cmSubscriptionStatus) + where: 'the following status is used' + scenario | statusAsString|| cmSubscriptionStatus + 'ACCEPTED'| 'ACCEPTED' || ACCEPTED + 'REJECTED'| 'REJECTED' || REJECTED } - def getLoggingEvent() { - return logger.list[0] + def createResponseEventFromDmi(correlationId, dmiPluginName, status) { + def jsonData ='{"data":{"statusCode":"1","statusMessage":"' + status + '"}}' + def dmiOutEventData = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class) + def dmiOutEvent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(dmiOutEventData)) + .withId('random-uuid') + .withType('subscriptionCreateResponse') + .withSource(URI.create(dmiPluginName)) + .withExtension('correlationid', correlationId).build() + return dmiOutEvent } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy index 80b5da2b09..b806258008 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy @@ -20,6 +20,9 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED + import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer @@ -31,8 +34,6 @@ import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher import org.onap.cps.ncmp.impl.utils.JexParser import spock.lang.Specification -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED - class CmSubscriptionHandlerImplSpec extends Specification { def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService) @@ -41,9 +42,24 @@ class CmSubscriptionHandlerImplSpec extends Specification { def mockInventoryPersistence = Mock(InventoryPersistence) def mockAlternateIdMatcher = Mock(AlternateIdMatcher) + void setup() { + mockCmSubscriptionPersistenceService.isNewSubscriptionId(!'existingId') >> true + } + def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, mockDmiInEventMapper, mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher) + def 'Attempt to create already existing subscription.'() { + given: 'the persistence service indicates the id is not new' + mockCmSubscriptionPersistenceService.isNewSubscriptionId('existingId') >> false + when: 'attempt to create the subscription' + objectUnderTest.createSubscription(new DataSelector(), 'existingId', ['/someDataNodeSelector']) + then: 'request is ignored and no method is invoked' + 0 * mockCmSubscriptionPersistenceService.add(*_) + and: 'no events are sent' + 0 * mockDmiInEventProducer.send(*_) + } + def 'Process subscription CREATE request for new target [non existing]'() { given: 'relevant subscription details' def mySubId = 'dataJobId' @@ -225,6 +241,23 @@ class CmSubscriptionHandlerImplSpec extends Specification { 'data node selector for other dmi' | 'someOtherDmi' || 0 } + def 'Log update when subscription status is REJECTED'() { + given: 'dmi service name and subscription id' + def myDmi = 'myDmi' + def mySubscriptionId = 'mySubscriptionId' + and: 'the persistence service returns all inactive data node selectors' + def myDataNodeSelectors = ['/parent[id=""]'].asList() + mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubscriptionId) >> myDataNodeSelectors + and: 'alternate id matcher always returns a cm handle id' + mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId' + and: 'the inventory persistence service returns a yang model with the given dmi service name' + mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: myDmi) + when: 'update subscription status is called with status=REJECTED' + objectUnderTest.updateCmSubscriptionStatus(mySubscriptionId, myDmi, REJECTED) + then: 'the persistence service to update subscription status called with REJECTED for matching dmi name' + 1 * mockCmSubscriptionPersistenceService.updateCmSubscriptionStatus('/parent[id=""]', REJECTED) + } + def getFdn(dataNodeSelector) { return JexParser.extractFdnPrefix(dataNodeSelector).orElse("") diff --git a/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json b/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json deleted file mode 100644 index 1ebee57154..0000000000 --- a/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "data": { - "statusCode": "1", - "statusMessage": "ACCEPTED" - } -} \ No newline at end of file