From 8143111cedbc3870fd214df49bf930a219fdb91b Mon Sep 17 00:00:00 2001 From: shikha0203 Date: Thu, 23 Oct 2025 15:01:32 +0100 Subject: [PATCH] Integration tests for delete subscription - Deleting a subscription - for delete event, the subscription should no longer exist - deleting a non-existent subscription should not throw a system error - Use Unique IDs for most tests - Cleaned descriptions Issue-ID: CPS-3016 Change-Id: Iafb0b86cf6256ff4ac8aaca68a3a7242fbf72462 Signed-off-by: shikha0203 --- .../subscription/ncmp/NcmpInEventConsumer.java | 12 +- .../subscription/CmSubscriptionSpec.groovy | 168 ++++++++++++++------- 2 files changed, 124 insertions(+), 56 deletions(-) diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java index 55553dccae..cfa8ba6d4a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/NcmpInEventConsumer.java @@ -56,10 +56,14 @@ public class NcmpInEventConsumer { log.info("Consumed subscription event with details: | dataJobId={} | eventType={}", dataJobId, eventType); - switch (eventType) { - case "dataJobCreated" -> handleCreate(dataJobId, dataJob); - case "dataJobDeleted" -> cmSubscriptionHandler.deleteSubscription(dataJobId); - default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId); + try { + switch (eventType) { + case "dataJobCreated" -> handleCreate(dataJobId, dataJob); + case "dataJobDeleted" -> cmSubscriptionHandler.deleteSubscription(dataJobId); + default -> log.warn("Unknown eventType={} for dataJobId={}", eventType, dataJobId); + } + } finally { + log.info("NCMP In Event has been Processed for dataJobId={}", dataJobId); } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy index f19a13f81b..d122c0e66a 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy @@ -22,11 +22,11 @@ package org.onap.cps.integration.functional.ncmp.datajobs.subscription import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY +import ch.qos.logback.classic.Logger import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.read.ListAppender import io.cloudevents.core.builder.CloudEventBuilder import io.cloudevents.kafka.CloudEventSerializer -import io.cloudevents.kafka.CloudEventDeserializer import java.nio.charset.StandardCharsets import java.time.Duration import org.apache.kafka.clients.producer.ProducerRecord @@ -57,7 +57,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def testResponseProducer def listAppender = new ListAppender() - def logger + def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer) def setup() { registerCmHandlesForSubscriptions() @@ -67,83 +67,82 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { dmiInConsumer.poll(Duration.ofMillis(500)) testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class) testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class) - logger = LoggerFactory.getLogger(NcmpInEventConsumer) listAppender.start() logger.addAppender(listAppender) } def cleanup() { + logger.detachAndStopAllAppenders() dmiInConsumer.unsubscribe() dmiInConsumer.close() testRequestProducer.close() testResponseProducer.close() kafkaTestContainer.close() deregisterCmHandles('dmi-0', ['cmHandle0']) - deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2']) + deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2', 'cmHandle5']) deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4']) } def 'Create subscription and send to multiple DMIs'() { - given: 'a data node selector on DMI-1' + given: 'data node selector with two paths on DMI-1' def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n''' - and: 'a data node selector on DMI-2' + and: 'data node selector with one path on DMI-2' def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child' and: 'an event payload' def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector) - def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId', eventDataNodeSelector) + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector) when: 'a subscription create request is sent' - sendSubscriptionCreateRequest(subscriptionTopic, 'key', eventPayload) + sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01') then: 'log shows event is consumed by ncmp' def messages = listAppender.list*.formattedMessage - messages.any { msg -> msg.contains('myDataJobId') && msg.contains('dataJobCreated')} + messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') } and: 'the 3 different data node selectors for the given data job id is persisted' - assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId').size() == 3 + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId01').size() == 3 and: 'get correlation ids from event sent to DMIs' def correlationIds = getAllConsumedCorrelationIds() and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)' assert correlationIds.size() == 2 - assert correlationIds.containsAll(['myDataJobId#dmi-1', 'myDataJobId#dmi-2']) + assert correlationIds.containsAll(['myDataJobId01#dmi-1', 'myDataJobId01#dmi-2']) } - def 'Update subscription status'() { + + def 'Create subscription accepted by DMI.'() { given: 'a persisted subscription' def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n') - sendSubscriptionCreateRequest(subscriptionTopic, 'newDataJob', eventPayload) + sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob') when: 'dmi accepts the subscription create request' sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0') then: 'there are no more inactive data node selector for given datajob id' assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0 and: 'status for the data node selector for given data job id is ACCEPTED' - def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', - '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY) - assert affectedDataNodes.leaves.every( entry -> entry.get('status') == 'ACCEPTED') + def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY) + assert affectedDataNodes.leaves.every(entry -> entry.get('status') == 'ACCEPTED') } def 'Create new subscription which partially overlaps with an existing active subscription'() { - given: 'an existing data node selector on DMI-1' - def existingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n''' - and: 'a new data node selector on DMI-2' - def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]' - and: 'an event payload' - def eventDataNodeSelector = (existingDmi1DataNodeSelector + newDmi2DataNodeSelector) - def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJob', eventDataNodeSelector) - and: 'an active subscription in database' + given: 'an active subscription in database' createAndAcceptSubscriptionA() - when: 'a new subscription create request is sent' - sendSubscriptionCreateRequest(subscriptionTopic, 'partialOverlappingDataJob', eventPayload) + and: 'and a partial overlapping subscription' + def overlappingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n''' + def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]' + def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector) + def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector) + when: 'create request event for overlapping subscription is sent' + sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId') then: 'log shows event is consumed by ncmp' def messages = listAppender.list*.formattedMessage - messages.any { msg -> msg.contains('partialOverlappingDataJob') && msg.contains('dataJobCreated')} + messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') } and: 'the 3 data node selectors for the given data job id is persisted' assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', - '//subscription/dataJobId[text()=\'partialOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 3 + '//subscription/dataJobId[text()=\'partialOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 3 and: 'only one data node selector is not active' - assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJob').size() == 1 + assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJobId').size() == 1 and: 'get correlation ids from event sent to DMIs' def correlationIds = getAllConsumedCorrelationIds() and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)' - assert !correlationIds.contains('partialOverlappingDataJob#dmi-1') - assert correlationIds.contains('partialOverlappingDataJob#dmi-2') + assert !correlationIds.contains('partialOverlappingDataJobId#dmi-1') + assert correlationIds.contains('partialOverlappingDataJobId#dmi-2') } def 'Create new subscription which completely overlaps with an active existing subscriptions'() { @@ -155,13 +154,13 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { createAndAcceptSubscriptionA() createAndAcceptSubscriptionB() when: 'a new subscription create request is sent' - sendSubscriptionCreateRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload) + sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId') then: 'log shows event is consumed by ncmp' def messages = listAppender.list*.formattedMessage - messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated')} + messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') } and: 'the 2 data node selectors for the given data job id is persisted' assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', - '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2 + '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2 and: 'there are no inactive data node selector' assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0 and: 'get correlation ids from event sent to DMIs' @@ -170,12 +169,77 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') } } + def 'Delete subscription removes last subscriber.'() { + given: 'an existing subscription with only one data node selector' + def dataNodeSelector = '/parent[id=\\\"5\\\"]' + and: 'a subscription created' + def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector) + sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId') + and: 'data nodes is persisted ' + def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY) + assert dataNodes.size() == 1 + assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]' + when: 'a delete event is received for the subscription' + def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector) + sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId') + then: 'the subscription is fully removed from persistence' + def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY) + assert remainingDataNodeSelector.isEmpty() + and: 'no other subscriptions exist for the same dataJobId' + def remainingDataJobId = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY) + assert remainingDataJobId.isEmpty() + and: 'a DMI delete event is published for the affected DMI' + def correlationIds = getAllConsumedCorrelationIds() + assert correlationIds.contains('lastDataJobId#dmi-1') + } + + def 'Delete subscription removes one of multiple subscribers.'() { + given: 'data node selector that is used by other subscriptions' + def dataNodeSelector = '/parent[id=\\\"1\\\"]' + def existingSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId + assert !existingSubscription.isEmpty() + and: 'a new subscription' + def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector) + sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove') + when: 'a delete event is received' + def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector) + sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove') + then: 'the data job id does not exist in database' + def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY) + assert resultForDeletedSubscription.isEmpty() + and: 'subscription still exist for the same data node selector' + def remainingSubscriptions = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions', + "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId + assert !remainingSubscriptions.isEmpty() + assert !remainingSubscriptions.contains('id-to-remove') + and: 'no DMI delete event is published' + def correlationIds = getAllConsumedCorrelationIds() + assert !correlationIds.contains(['id-to-remove#dmi-1']) + } + + def 'Deleting non-existent subscription.'() { + given: 'an event payload' + def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting') + when: 'a delete event is received for the non-existent subscription' + sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId') + then: 'no exception is thrown' + noExceptionThrown() + and: 'nothing is sent to DMI' + getAllConsumedCorrelationIds().isEmpty() + } + def registerCmHandlesForSubscriptions() { - registerCmHandle('dmi-0', 'cmHandle0', '','/parent=0') - registerCmHandle('dmi-1', 'cmHandle1', '','/parent=1') - registerCmHandle('dmi-1', 'cmHandle2', '','/parent=2') - registerCmHandle('dmi-2', 'cmHandle3', '','/parent=3') - registerCmHandle('dmi-2', 'cmHandle4', '','/parent=4') + registerCmHandle('dmi-0', 'cmHandle0', '', '/parent=0') + registerCmHandle('dmi-1', 'cmHandle1', '', '/parent=1') + registerCmHandle('dmi-1', 'cmHandle2', '', '/parent=2') + registerCmHandle('dmi-2', 'cmHandle3', '', '/parent=3') + registerCmHandle('dmi-2', 'cmHandle4', '', '/parent=4') + registerCmHandle('dmi-1', 'cmHandle5', '', '/parent=5') } def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) { @@ -189,7 +253,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def createAndAcceptSubscriptionA() { def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child''' def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector) - sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobA', eventPayload) + sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA') sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1') sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2') } @@ -197,27 +261,28 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def createAndAcceptSubscriptionB() { def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]''' def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector) - sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobB', eventPayload) + sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB') sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2') } - def sendSubscriptionCreateRequest(topic, eventKey, eventPayload) { - def event = new ProducerRecord<>(topic, eventKey, eventPayload); + def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) { + def event = new ProducerRecord<>(topic, eventKey, eventPayload) testRequestProducer.send(event) - sleep(1000) + //TODO Add polling within for log to report dataJobId is finished (separate commit) + sleep(2000) } def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) { - def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json') + def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json') eventPayload = eventPayload.replace('#statusCode', statusCode) eventPayload = eventPayload.replace('#statusMessage', statusMessage) def cloudEvent = CloudEventBuilder.v1() - .withData(eventPayload.getBytes(StandardCharsets.UTF_8)) - .withId('random-uuid') - .withType(eventType) - .withSource(URI.create(eventSource)) - .withExtension('correlationid', correlationId).build() - def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent); + .withData(eventPayload.getBytes(StandardCharsets.UTF_8)) + .withId('random-uuid') + .withType(eventType) + .withSource(URI.create(eventSource)) + .withExtension('correlationid', correlationId).build() + def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent) testResponseProducer.send(event) sleep(2000) } @@ -236,7 +301,6 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def value = new String(header.value()) headersMap[key] << value } - } return headersMap } -- 2.16.6