Integration tests for delete subscription 14/142314/8
authorshikha0203 <shivani.khare@est.tech>
Thu, 23 Oct 2025 14:01:32 +0000 (15:01 +0100)
committershikha0203 <shivani.khare@est.tech>
Mon, 10 Nov 2025 13:01:30 +0000 (13:01 +0000)
- Deleting a subscription
- for delete event, the subscription should no longer exist
- deleting a non-existent subscription should not throw a system error

- Use Unique IDs for most tests
- Cleaned descriptions

Issue-ID: CPS-3016
Change-Id: Iafb0b86cf6256ff4ac8aaca68a3a7242fbf72462
Signed-off-by: shikha0203 <shivani.khare@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy

index 55553dc..cfa8ba6 100644 (file)
@@ -56,10 +56,14 @@ public class NcmpInEventConsumer {
 
         log.info("Consumed subscription event with details: | dataJobId={} | eventType={}", dataJobId, eventType);
 
-        switch (eventType) {
-            case "dataJobCreated" -> handleCreate(dataJobId, dataJob);
-            case "dataJobDeleted" -> cmSubscriptionHandler.deleteSubscription(dataJobId);
-            default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId);
+        try {
+            switch (eventType) {
+                case "dataJobCreated" -> handleCreate(dataJobId, dataJob);
+                case "dataJobDeleted" -> cmSubscriptionHandler.deleteSubscription(dataJobId);
+                default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId);
+            }
+        } finally {
+            log.info("NCMP In Event has been Processed for dataJobId={}", dataJobId);
         }
     }
 
index f19a13f..d122c0e 100644 (file)
@@ -22,11 +22,11 @@ package org.onap.cps.integration.functional.ncmp.datajobs.subscription
 
 import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
 
+import ch.qos.logback.classic.Logger
 import ch.qos.logback.classic.spi.ILoggingEvent
 import ch.qos.logback.core.read.ListAppender
 import io.cloudevents.core.builder.CloudEventBuilder
 import io.cloudevents.kafka.CloudEventSerializer
-import io.cloudevents.kafka.CloudEventDeserializer
 import java.nio.charset.StandardCharsets
 import java.time.Duration
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -57,7 +57,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def testResponseProducer
 
     def listAppender = new ListAppender<ILoggingEvent>()
-    def logger
+    def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
 
     def setup() {
         registerCmHandlesForSubscriptions()
@@ -67,83 +67,82 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
         dmiInConsumer.poll(Duration.ofMillis(500))
         testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
         testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
-        logger = LoggerFactory.getLogger(NcmpInEventConsumer)
         listAppender.start()
         logger.addAppender(listAppender)
     }
 
     def cleanup() {
+        logger.detachAndStopAllAppenders()
         dmiInConsumer.unsubscribe()
         dmiInConsumer.close()
         testRequestProducer.close()
         testResponseProducer.close()
         kafkaTestContainer.close()
         deregisterCmHandles('dmi-0', ['cmHandle0'])
-        deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2'])
+        deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2', 'cmHandle5'])
         deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4'])
     }
 
     def 'Create subscription and send to multiple DMIs'() {
-        given: 'a data node selector on DMI-1'
+        given: 'data node selector with two paths on DMI-1'
             def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
-        and: 'a data node selector on DMI-2'
+        and: 'data node selector with one path on DMI-2'
             def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child'
         and: 'an event payload'
             def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
-            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId', eventDataNodeSelector)
+            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector)
         when: 'a subscription create request is sent'
-            sendSubscriptionCreateRequest(subscriptionTopic, 'key', eventPayload)
+            sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01')
         then: 'log shows event is consumed by ncmp'
             def messages = listAppender.list*.formattedMessage
-            messages.any { msg -> msg.contains('myDataJobId') && msg.contains('dataJobCreated')}
+            messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') }
         and: 'the 3 different data node selectors for the given data job id is persisted'
-            assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId').size() == 3
+            assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId01').size() == 3
         and: 'get correlation ids from event sent to DMIs'
             def correlationIds = getAllConsumedCorrelationIds()
         and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)'
             assert correlationIds.size() == 2
-            assert correlationIds.containsAll(['myDataJobId#dmi-1', 'myDataJobId#dmi-2'])
+            assert correlationIds.containsAll(['myDataJobId01#dmi-1', 'myDataJobId01#dmi-2'])
     }
 
-    def 'Update subscription status'() {
+
+    def 'Create subscription accepted by DMI.'() {
         given: 'a persisted subscription'
             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
-            sendSubscriptionCreateRequest(subscriptionTopic, 'newDataJob', eventPayload)
+            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob')
         when: 'dmi accepts the subscription create request'
             sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
         then: 'there are no more inactive data node selector for given datajob id'
             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0
         and: 'status for the data node selector for given data job id is ACCEPTED'
-            def affectedDataNodes =  cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
-                '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
-            assert affectedDataNodes.leaves.every( entry -> entry.get('status') == 'ACCEPTED')
+            def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
+            assert affectedDataNodes.leaves.every(entry -> entry.get('status') == 'ACCEPTED')
     }
 
     def 'Create new subscription which partially overlaps with an existing active subscription'() {
-        given: 'an existing data node selector on DMI-1'
-            def existingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
-        and: 'a new data node selector on DMI-2'
-            def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
-        and: 'an event payload'
-            def eventDataNodeSelector = (existingDmi1DataNodeSelector + newDmi2DataNodeSelector)
-            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJob', eventDataNodeSelector)
-        and: 'an active subscription in database'
+        given: 'an active subscription in database'
             createAndAcceptSubscriptionA()
-        when: 'a new subscription create request is sent'
-            sendSubscriptionCreateRequest(subscriptionTopic, 'partialOverlappingDataJob', eventPayload)
+        and: 'and a partial overlapping subscription'
+            def overlappingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
+            def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
+            def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector)
+            def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector)
+        when: 'create request event for overlapping subscription is sent'
+            sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId')
         then: 'log shows event is consumed by ncmp'
             def messages = listAppender.list*.formattedMessage
-            messages.any { msg -> msg.contains('partialOverlappingDataJob') && msg.contains('dataJobCreated')}
+            messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') }
         and: 'the 3 data node selectors for the given data job id is persisted'
             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
-                '//subscription/dataJobId[text()=\'partialOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 3
+                '//subscription/dataJobId[text()=\'partialOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 3
         and: 'only one data node selector is not active'
-            assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJob').size() == 1
+            assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJobId').size() == 1
         and: 'get correlation ids from event sent to DMIs'
             def correlationIds = getAllConsumedCorrelationIds()
         and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)'
-            assert !correlationIds.contains('partialOverlappingDataJob#dmi-1')
-            assert correlationIds.contains('partialOverlappingDataJob#dmi-2')
+            assert !correlationIds.contains('partialOverlappingDataJobId#dmi-1')
+            assert correlationIds.contains('partialOverlappingDataJobId#dmi-2')
     }
 
     def 'Create new subscription which completely overlaps with an active existing subscriptions'() {
@@ -155,13 +154,13 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             createAndAcceptSubscriptionA()
             createAndAcceptSubscriptionB()
         when: 'a new subscription create request is sent'
-            sendSubscriptionCreateRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload)
+            sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId')
         then: 'log shows event is consumed by ncmp'
             def messages = listAppender.list*.formattedMessage
-            messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated')}
+            messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') }
         and: 'the 2 data node selectors for the given data job id is persisted'
             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
-                '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
+                    '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
         and: 'there are no inactive data node selector'
             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
         and: 'get correlation ids from event sent to DMIs'
@@ -170,12 +169,77 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
             assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
     }
 
+    def 'Delete subscription removes last subscriber.'() {
+        given: 'an existing subscription with only one data node selector'
+            def dataNodeSelector = '/parent[id=\\\"5\\\"]'
+        and: 'a subscription created'
+            def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector)
+            sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId')
+        and: 'data nodes is persisted '
+            def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
+            assert  dataNodes.size() == 1
+            assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]'
+        when: 'a delete event is received for the subscription'
+            def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector)
+            sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId')
+        then: 'the subscription is fully removed from persistence'
+            def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
+            assert remainingDataNodeSelector.isEmpty()
+        and: 'no other subscriptions exist for the same dataJobId'
+            def remainingDataJobId = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
+            assert remainingDataJobId.isEmpty()
+        and: 'a DMI delete event is published for the affected DMI'
+            def correlationIds = getAllConsumedCorrelationIds()
+            assert correlationIds.contains('lastDataJobId#dmi-1')
+    }
+
+    def 'Delete subscription removes one of multiple subscribers.'() {
+        given: 'data node selector that is used by other subscriptions'
+            def dataNodeSelector = '/parent[id=\\\"1\\\"]'
+            def existingSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
+            assert !existingSubscription.isEmpty()
+        and: 'a new subscription'
+            def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector)
+            sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove')
+        when: 'a delete event is received'
+            def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector)
+            sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove')
+        then: 'the data job id does not exist in database'
+            def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY)
+            assert resultForDeletedSubscription.isEmpty()
+        and: 'subscription still exist for the same data node selector'
+            def remainingSubscriptions = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+                    "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
+            assert !remainingSubscriptions.isEmpty()
+            assert !remainingSubscriptions.contains('id-to-remove')
+        and: 'no DMI delete event is published'
+            def correlationIds = getAllConsumedCorrelationIds()
+            assert !correlationIds.contains(['id-to-remove#dmi-1'])
+    }
+
+    def 'Deleting non-existent subscription.'() {
+        given: 'an event payload'
+            def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting')
+        when: 'a delete event is received for the non-existent subscription'
+            sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId')
+        then: 'no exception is thrown'
+            noExceptionThrown()
+        and: 'nothing is sent to DMI'
+            getAllConsumedCorrelationIds().isEmpty()
+    }
+
     def registerCmHandlesForSubscriptions() {
-        registerCmHandle('dmi-0', 'cmHandle0', '','/parent=0')
-        registerCmHandle('dmi-1', 'cmHandle1', '','/parent=1')
-        registerCmHandle('dmi-1', 'cmHandle2', '','/parent=2')
-        registerCmHandle('dmi-2', 'cmHandle3', '','/parent=3')
-        registerCmHandle('dmi-2', 'cmHandle4', '','/parent=4')
+        registerCmHandle('dmi-0', 'cmHandle0', '', '/parent=0')
+        registerCmHandle('dmi-1', 'cmHandle1', '', '/parent=1')
+        registerCmHandle('dmi-1', 'cmHandle2', '', '/parent=2')
+        registerCmHandle('dmi-2', 'cmHandle3', '', '/parent=3')
+        registerCmHandle('dmi-2', 'cmHandle4', '', '/parent=4')
+        registerCmHandle('dmi-1', 'cmHandle5', '', '/parent=5')
     }
 
     def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) {
@@ -189,7 +253,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def createAndAcceptSubscriptionA() {
         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
-        sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobA', eventPayload)
+        sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA')
         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
     }
@@ -197,27 +261,28 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def createAndAcceptSubscriptionB() {
         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
-        sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobB', eventPayload)
+        sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB')
         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
     }
 
-    def sendSubscriptionCreateRequest(topic, eventKey, eventPayload) {
-        def event = new ProducerRecord<>(topic, eventKey, eventPayload);
+    def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) {
+        def event = new ProducerRecord<>(topic, eventKey, eventPayload)
         testRequestProducer.send(event)
-        sleep(1000)
+        //TODO Add polling within for log to report dataJobId is finished (separate commit)
+        sleep(2000)
     }
 
     def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
-        def eventPayload =  readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
+        def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
         eventPayload = eventPayload.replace('#statusCode', statusCode)
         eventPayload = eventPayload.replace('#statusMessage', statusMessage)
         def cloudEvent = CloudEventBuilder.v1()
-            .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
-            .withId('random-uuid')
-            .withType(eventType)
-            .withSource(URI.create(eventSource))
-            .withExtension('correlationid', correlationId).build()
-        def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent);
+                .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
+                .withId('random-uuid')
+                .withType(eventType)
+                .withSource(URI.create(eventSource))
+                .withExtension('correlationid', correlationId).build()
+        def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent)
         testResponseProducer.send(event)
         sleep(2000)
     }
@@ -236,7 +301,6 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
                 def value = new String(header.value())
                 headersMap[key] << value
             }
-
         }
         return headersMap
     }