Add Rejected scenario(s) for create subscription 88/142388/7
authoremaclee <lee.anjella.macabuhay@est.tech>
Tue, 4 Nov 2025 18:22:51 +0000 (18:22 +0000)
committeremaclee <lee.anjella.macabuhay@est.tech>
Thu, 20 Nov 2025 11:19:48 +0000 (11:19 +0000)
- added consuming REJECTED create request from DMI
- when REJECTED request is consumed, should update database and
  log details (dataNodeSelector, dataJobId, dmi details)
- added ignoring when dataJobId is not unique with test

Issue-ID: CPS-3005
Change-Id: If06c6ddfb3cf151ebf9485d43ba9d6ee5ef57287
Signed-off-by: emaclee <lee.anjella.macabuhay@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json [deleted file]

index 141c74a..d03fb4b 100644 (file)
@@ -22,7 +22,6 @@
 package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
 
 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED;
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
 
 import io.cloudevents.CloudEvent;
@@ -42,9 +41,10 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class EventConsumer {
 
-    private final CmSubscriptionHandler cmSubscriptionHandler;
     private static final String CORRELATION_ID_SEPARATOR = "#";
 
+    private final CmSubscriptionHandler cmSubscriptionHandler;
+
     /**
      * Consume the Cm Notification Subscription response event from the dmi-plugin.
      *
@@ -69,10 +69,7 @@ public class EventConsumer {
 
             if ("subscriptionCreateResponse".equals(eventType)) {
                 final CmSubscriptionStatus cmSubscriptionStatus = getCmSubscriptionStatus(dmiOutEvent);
-                if (ACCEPTED.equals(cmSubscriptionStatus)) {
-                    cmSubscriptionHandler.updateCmSubscriptionStatus(
-                            subscriptionId, dmiPluginName, cmSubscriptionStatus);
-                }
+                cmSubscriptionHandler.updateCmSubscriptionStatus(subscriptionId, dmiPluginName, cmSubscriptionStatus);
             }
         }
     }
index 092bff7..f4fc169 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
 
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -42,7 +44,6 @@ import org.onap.cps.ncmp.impl.utils.JexParser;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
 
-
 @Service
 @RequiredArgsConstructor
 @Slf4j
@@ -58,12 +59,14 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
     @Override
     public void createSubscription(final DataSelector dataSelector,
                                    final String subscriptionId, final List<String> dataNodeSelectors) {
-        for (final String dataNodeSelector : dataNodeSelectors) {
-            cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector);
+        if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) {
+            for (final String dataNodeSelector : dataNodeSelectors) {
+                cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector);
+            }
+            sendEventToDmis(subscriptionId,
+                    cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId),
+                    dataSelector, "subscriptionCreateRequest");
         }
-        sendEventToDmis(subscriptionId,
-                cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId),
-                dataSelector, "subscriptionCreateRequest");
     }
 
     @Override
@@ -86,20 +89,37 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
                                            final CmSubscriptionStatus cmSubscriptionStatus) {
         final List<String> dataNodeSelectors =
                 cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
+        final List<String> rejectedDataNodeSelectors = new ArrayList<>();
         for (final String dataNodeSelector : dataNodeSelectors) {
             final String cmHandleId = getCmHandleId(dataNodeSelector);
             if (cmHandleId == null) {
-                log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector);
-            } else {
-                final String resolvedDmiServiceName = getDmiServiceName(cmHandleId);
-                if (resolvedDmiServiceName.equals(dmiServiceName)) {
-                    cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector,
-                            cmSubscriptionStatus);
+                log.info("Ignoring dataNodeSelector={} because no matching CM Handle ID found",
+                        dataNodeSelector);
+                continue;
+            }
+            final String resolvedDmiServiceName = getDmiServiceName(cmHandleId);
+            if (resolvedDmiServiceName.equals(dmiServiceName)) {
+                cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector,
+                        cmSubscriptionStatus);
+                if (cmSubscriptionStatus.equals(REJECTED)) {
+                    rejectedDataNodeSelectors.add(dataNodeSelector);
                 }
             }
+
+        }
+        if (!rejectedDataNodeSelectors.isEmpty()) {
+            logRejectedDataNodeSelectors(subscriptionId, dmiServiceName, rejectedDataNodeSelectors);
         }
     }
 
+    private static void logRejectedDataNodeSelectors(final String subscriptionId, final String dmiServiceName,
+                                                     final List<String> rejectedDataNodeSelectors) {
+        final String dataNodeSelectorAsString =
+                JexParser.toJsonExpressionsAsString(rejectedDataNodeSelectors);
+        log.info("DMI plugin {} rejected dataNodeSelector '{}' for dataJobId:{}",
+                dmiServiceName, dataNodeSelectorAsString, subscriptionId);
+    }
+
     private void sendEventToDmis(final String subscriptionId,
                                  final List<String> dataNodeSelectors,
                                  final DataSelector dataSelector,
index af0d322..75a9e5a 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
 
 import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
 
 import ch.qos.logback.classic.Level
 import ch.qos.logback.classic.Logger
@@ -32,7 +33,6 @@ import io.cloudevents.core.builder.CloudEventBuilder
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent
 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler
-import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.slf4j.LoggerFactory
 import spock.lang.Specification
@@ -55,30 +55,34 @@ class EventConsumerSpec extends Specification {
         ((Logger) LoggerFactory.getLogger(EventConsumer.class)).detachAndStopAllAppenders()
     }
 
-    def 'Consume subscription CREATE response with status ACCEPTED from DMI Plugin'() {
+    def 'Consume subscription #scenario CREATE response from DMI Plugin'() {
         given: 'a response event from DMI'
-            def jsonData = TestUtils.getResourceFileContent(
-                'datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json')
-            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class)
-            def testCloudEventSent = CloudEventBuilder.v1()
-                .withData(objectMapper.writeValueAsBytes(testEventSent))
-                .withId('random-uuid')
-                .withType('subscriptionCreateResponse')
-                .withSource(URI.create('myDmi'))
-                .withExtension('correlationid', 'sub-1#myDmi').build()
-            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+            def responseEvent = createResponseEventFromDmi('sub-1#myDmi', 'myDmi', statusAsString)
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', responseEvent)
         when: 'the event is consumed'
             objectUnderTest.consumeDmiOutEvent(consumerRecord)
         then: 'an event is logged with level INFO'
-            def loggingEvent = getLoggingEvent()
+            def loggingEvent = logger.list.first()
             assert loggingEvent.level == Level.INFO
         and: 'the log indicates the task completed successfully'
             assert loggingEvent.formattedMessage == 'Consumed DMI subscription response event with details: | correlationId=sub-1#myDmi | eventType=subscriptionCreateResponse'
         and:  'the subscription handler is called to update status of subscription with correct details'
-            1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', ACCEPTED)
+            1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', cmSubscriptionStatus)
+        where: 'the following status is used'
+            scenario  | statusAsString|| cmSubscriptionStatus
+            'ACCEPTED'| 'ACCEPTED'    || ACCEPTED
+            'REJECTED'| 'REJECTED'    || REJECTED
     }
 
-    def getLoggingEvent() {
-        return logger.list[0]
+    def createResponseEventFromDmi(correlationId, dmiPluginName, status) {
+        def jsonData ='{"data":{"statusCode":"1","statusMessage":"' + status + '"}}'
+        def dmiOutEventData = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class)
+        def dmiOutEvent = CloudEventBuilder.v1()
+            .withData(objectMapper.writeValueAsBytes(dmiOutEventData))
+            .withId('random-uuid')
+            .withType('subscriptionCreateResponse')
+            .withSource(URI.create(dmiPluginName))
+            .withExtension('correlationid', correlationId).build()
+        return dmiOutEvent
     }
 }
index 80b5da2..b806258 100644 (file)
@@ -20,6 +20,9 @@
 
 package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
 
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
+
 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer
@@ -31,8 +34,6 @@ import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher
 import org.onap.cps.ncmp.impl.utils.JexParser
 import spock.lang.Specification
 
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-
 class CmSubscriptionHandlerImplSpec extends Specification {
 
     def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
@@ -41,9 +42,24 @@ class CmSubscriptionHandlerImplSpec extends Specification {
     def mockInventoryPersistence = Mock(InventoryPersistence)
     def mockAlternateIdMatcher = Mock(AlternateIdMatcher)
 
+    void setup() {
+        mockCmSubscriptionPersistenceService.isNewSubscriptionId(!'existingId') >> true
+    }
+
     def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, mockDmiInEventMapper,
             mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher)
 
+    def 'Attempt to create already existing subscription.'() {
+        given: 'the persistence service indicates the id is not new'
+            mockCmSubscriptionPersistenceService.isNewSubscriptionId('existingId') >> false
+        when: 'attempt to create the subscription'
+            objectUnderTest.createSubscription(new DataSelector(), 'existingId', ['/someDataNodeSelector'])
+        then: 'request is ignored and no method is invoked'
+            0 * mockCmSubscriptionPersistenceService.add(*_)
+        and: 'no events are sent'
+            0 * mockDmiInEventProducer.send(*_)
+    }
+
     def 'Process subscription CREATE request for new target [non existing]'() {
         given: 'relevant subscription details'
             def mySubId = 'dataJobId'
@@ -225,6 +241,23 @@ class CmSubscriptionHandlerImplSpec extends Specification {
             'data node selector for other dmi' | 'someOtherDmi' || 0
     }
 
+    def 'Log update when subscription status is REJECTED'() {
+        given: 'dmi service name and subscription id'
+            def myDmi = 'myDmi'
+            def mySubscriptionId = 'mySubscriptionId'
+        and: 'the persistence service returns all inactive data node selectors'
+            def myDataNodeSelectors = ['/parent[id=""]'].asList()
+            mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubscriptionId) >> myDataNodeSelectors
+        and: 'alternate id matcher always returns a cm handle id'
+            mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
+        and: 'the inventory persistence service returns a yang model with the given dmi service name'
+            mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: myDmi)
+        when: 'update subscription status is called with status=REJECTED'
+            objectUnderTest.updateCmSubscriptionStatus(mySubscriptionId, myDmi, REJECTED)
+        then: 'the persistence service to update subscription status called with REJECTED for matching dmi name'
+            1 * mockCmSubscriptionPersistenceService.updateCmSubscriptionStatus('/parent[id=""]', REJECTED)
+    }
+
 
     def getFdn(dataNodeSelector) {
         return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
diff --git a/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json b/cps-ncmp-service/src/test/resources/datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json
deleted file mode 100644 (file)
index 1ebee57..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-{
-  "data": {
-    "statusCode": "1",
-    "statusMessage": "ACCEPTED"
-  }
-}
\ No newline at end of file