From ceda9de3a72e43710d1b5f9773ab1b4a9489e81c Mon Sep 17 00:00:00 2001 From: mpriyank Date: Mon, 25 Aug 2025 14:09:21 +0100 Subject: [PATCH] Data producer id field during cm handle create/update - updated the CmHandleCreate test to prove that the LcmEvent is not triggered during the Cm Handle Create Flow - added new test for CmHandleUpdate to prove the LcmEvent is triggered when updating the data producer id field during the Cm Handle Update flow - refactored the common code and moved it to CpsIntegrationBase spec Issue-ID: CPS-2963 Change-Id: I11e8b5d7c803334e0eb2979001e783d24c61c6d3 Signed-off-by: mpriyank --- .../integration/base/CpsIntegrationSpecBase.groovy | 23 ++++++++++ .../ncmp/inventory/CmHandleCreateSpec.groovy | 50 +++++++-------------- .../ncmp/inventory/CmHandleUpdateSpec.groovy | 52 +++++++++++++++++++++- 3 files changed, 90 insertions(+), 35 deletions(-) diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy index 99c7cb9be2..7de0ad5fe8 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy @@ -23,6 +23,8 @@ package org.onap.cps.integration.base import com.hazelcast.map.IMap import okhttp3.mockwebserver.MockWebServer +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDataService import org.onap.cps.api.CpsDataspaceService @@ -64,6 +66,7 @@ import org.testcontainers.spock.Testcontainers import spock.lang.Shared import spock.lang.Specification +import java.time.Duration import java.time.OffsetDateTime import java.util.concurrent.BlockingQueue @@ -77,6 +80,8 @@ import java.util.concurrent.BlockingQueue @ActiveProfiles('module-sync-delayed') abstract class CpsIntegrationSpecBase extends Specification { + static KafkaConsumer kafkaConsumer + @Shared DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance() @@ -329,4 +334,22 @@ abstract class CpsIntegrationSpecBase extends Specification { networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds)) } + def subscribeAndClearPreviousMessages(consumerGroupId, topicName) { + kafkaConsumer = KafkaTestContainer.getConsumer(consumerGroupId, StringDeserializer.class) + kafkaConsumer.subscribe([topicName]) + kafkaConsumer.poll(Duration.ofMillis(500)) + } + + def getLatestConsumerRecordsWithMaxPollOf1Second(numberOfRecordsToRead) { + def consumerRecords = [] + def retryAttempts = 10 + while (consumerRecords.size() < numberOfRecordsToRead) { + retryAttempts-- + consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100))) + if (retryAttempts == 0) + break + } + return consumerRecords + } + } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy index 9263b2df17..2e1c803009 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy @@ -20,30 +20,25 @@ package org.onap.cps.integration.functional.ncmp.inventory -import java.time.Duration -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.cps.integration.KafkaTestContainer + import org.onap.cps.integration.base.CpsIntegrationSpecBase import org.onap.cps.ncmp.api.NcmpResponseStatus -import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse +import org.onap.cps.ncmp.api.inventory.models.CmHandleState import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration +import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.events.lcm.v1.LcmEvent -import org.onap.cps.ncmp.api.inventory.models.CmHandleState -import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory +import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl class CmHandleCreateSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacadeImpl objectUnderTest def uniqueId = 'ch-unique-id-for-create-test' - static KafkaConsumer kafkaConsumer - def setup() { objectUnderTest = networkCmProxyInventoryFacade - subscribeAndClearPreviousMessages() + subscribeAndClearPreviousMessages('test-group', 'ncmp-events') } def cleanup() { @@ -57,7 +52,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2'] when: 'a CM-handle is registered for creation' - def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId) + def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId, dataProducerIdentifier: 'my-data-producer-identifier') def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]) def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) @@ -77,7 +72,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort() then: 'get the latest messages' - def consumerRecords = getLatestConsumerRecords() + def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(2) and: 'both converted messages are for the correct cm handle' def notificationMessages = [] @@ -86,11 +81,16 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { } assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ] - and: 'the oldest event is about the update to ADVISED state' - notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED' + and: 'the oldest event is about the update to ADVISED state and it has a data producer id' + assert notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED' + assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-identifier' + + and: 'the next event is about update to READY state and it has the same data producer identifier as in in ADVISED state' + assert notificationMessages[1].event.newValues.cmHandleState.value() == 'READY' + assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier' - and: 'the next event is about update to READY state' - notificationMessages[1].event.newValues.cmHandleState.value() == 'READY' + and: 'there are no more messages to be read' + assert getLatestConsumerRecordsWithMaxPollOf1Second(1).size() == 0 cleanup: 'deregister CM handle' deregisterCmHandle(DMI1_URL, uniqueId) @@ -215,22 +215,4 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2']) } - def subscribeAndClearPreviousMessages() { - kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class) - kafkaConsumer.subscribe(['ncmp-events']) - kafkaConsumer.poll(Duration.ofMillis(500)) - } - - def getLatestConsumerRecords() { - def consumerRecords = [] - def retryAttempts = 10 - while (consumerRecords.size() < 2) { - retryAttempts-- - consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100))) - if (retryAttempts == 0) - break - } - return consumerRecords - } - } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy index 8b3fc24abf..18c7096b66 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy @@ -22,10 +22,11 @@ package org.onap.cps.integration.functional.ncmp.inventory import org.onap.cps.integration.base.CpsIntegrationSpecBase import org.onap.cps.ncmp.api.NcmpResponseStatus -import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent +import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl class CmHandleUpdateSpec extends CpsIntegrationSpecBase { @@ -33,6 +34,12 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase { def setup() { objectUnderTest = networkCmProxyInventoryFacade + subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events') + } + + def cleanup() { + kafkaConsumer.unsubscribe() + kafkaConsumer.close() } def 'Update of CM-handle with new or unchanged alternate ID succeeds.'() { @@ -86,4 +93,47 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase { deregisterCmHandle(DMI1_URL, 'ch-1') } + def 'CM Handle registration to verify changes in data producer identifier'() { + given: 'DMI will return modules when requested' + def cmHandleId = 'ch-id-for-update' + dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2'] + + when: 'a CM-handle is registered for creation' + + def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId) + def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]) + def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration) + + then: 'registration gives successful response' + assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)] + + then: 'the module sync watchdog is triggered' + moduleSyncWatchdog.moduleSyncAdvisedCmHandles() + + and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)' + getLatestConsumerRecordsWithMaxPollOf1Second(2) + + and: 'cm handle updated with the data producer identifier' + def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id') + def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]) + def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate) + + then: 'registration gives successful response' + assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)] + + and: 'get the latest message' + def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(1) + + and: 'the message has the updated data producer identifier' + def notificationMessages = [] + for (def consumerRecord : consumerRecords) { + notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent)) + } + assert notificationMessages[0].event.cmHandleId.contains(cmHandleId) + assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-id' + + cleanup: 'deregister CM handle' + deregisterCmHandle(DMI1_URL, cmHandleId) + } + } -- 2.16.6