Data producer id field during cm handle create/update 37/141937/5
authormpriyank <priyank.maheshwari@est.tech>
Mon, 25 Aug 2025 13:09:21 +0000 (14:09 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Wed, 27 Aug 2025 12:38:16 +0000 (13:38 +0100)
- updated the CmHandleCreate test to prove that the LcmEvent is not
  triggered during the Cm Handle Create Flow
- added new test for CmHandleUpdate to prove the LcmEvent is triggered
  when updating the data producer id field during the Cm Handle Update
  flow
- refactored the common code and moved it to CpsIntegrationBase spec

Issue-ID: CPS-2963
Change-Id: I11e8b5d7c803334e0eb2979001e783d24c61c6d3
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy

index 99c7cb9..7de0ad5 100644 (file)
@@ -23,6 +23,8 @@ package org.onap.cps.integration.base
 
 import com.hazelcast.map.IMap
 import okhttp3.mockwebserver.MockWebServer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDataService
 import org.onap.cps.api.CpsDataspaceService
@@ -64,6 +66,7 @@ import org.testcontainers.spock.Testcontainers
 import spock.lang.Shared
 import spock.lang.Specification
 
+import java.time.Duration
 import java.time.OffsetDateTime
 import java.util.concurrent.BlockingQueue
 
@@ -77,6 +80,8 @@ import java.util.concurrent.BlockingQueue
 @ActiveProfiles('module-sync-delayed')
 abstract class CpsIntegrationSpecBase extends Specification {
 
+    static KafkaConsumer kafkaConsumer
+
     @Shared
     DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance()
 
@@ -329,4 +334,22 @@ abstract class CpsIntegrationSpecBase extends Specification {
         networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
     }
 
+    def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
+        kafkaConsumer = KafkaTestContainer.getConsumer(consumerGroupId, StringDeserializer.class)
+        kafkaConsumer.subscribe([topicName])
+        kafkaConsumer.poll(Duration.ofMillis(500))
+    }
+
+    def getLatestConsumerRecordsWithMaxPollOf1Second(numberOfRecordsToRead) {
+        def consumerRecords = []
+        def retryAttempts = 10
+        while (consumerRecords.size() < numberOfRecordsToRead) {
+            retryAttempts--
+            consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
+            if (retryAttempts == 0)
+                break
+        }
+        return consumerRecords
+    }
+
 }
index 9263b2d..2e1c803 100644 (file)
 
 package org.onap.cps.integration.functional.ncmp.inventory
 
-import java.time.Duration
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.integration.KafkaTestContainer
+
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.api.NcmpResponseStatus
-import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
+import org.onap.cps.ncmp.api.inventory.models.CmHandleState
 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
+import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
-import org.onap.cps.ncmp.api.inventory.models.CmHandleState
-import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
+import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 
 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyInventoryFacadeImpl objectUnderTest
     def uniqueId = 'ch-unique-id-for-create-test'
 
-    static KafkaConsumer kafkaConsumer
-
     def setup() {
         objectUnderTest = networkCmProxyInventoryFacade
-        subscribeAndClearPreviousMessages()
+        subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
     }
 
     def cleanup() {
@@ -57,7 +52,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
 
         when: 'a CM-handle is registered for creation'
-            def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId)
+            def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId, dataProducerIdentifier: 'my-data-producer-identifier')
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
 
@@ -77,7 +72,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
 
         then: 'get the latest messages'
-            def consumerRecords = getLatestConsumerRecords()
+            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(2)
 
         and: 'both converted messages are for the correct cm handle'
             def notificationMessages = []
@@ -86,11 +81,16 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             }
             assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ]
 
-        and: 'the oldest event is about the update to ADVISED state'
-            notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
+        and: 'the oldest event is about the update to ADVISED state and it has a data producer id'
+            assert notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
+            assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-identifier'
+
+        and: 'the next event is about update to READY state and it has the same data producer identifier as in in ADVISED state'
+            assert notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
+            assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier'
 
-        and: 'the next event is about update to READY state'
-            notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
+        and: 'there are no more messages to be read'
+            assert getLatestConsumerRecordsWithMaxPollOf1Second(1).size() == 0
 
         cleanup: 'deregister CM handle'
             deregisterCmHandle(DMI1_URL, uniqueId)
@@ -215,22 +215,4 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
     }
 
-    def subscribeAndClearPreviousMessages() {
-        kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class)
-        kafkaConsumer.subscribe(['ncmp-events'])
-        kafkaConsumer.poll(Duration.ofMillis(500))
-    }
-
-    def getLatestConsumerRecords() {
-        def consumerRecords = []
-        def retryAttempts = 10
-        while (consumerRecords.size() < 2) {
-            retryAttempts--
-            consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
-            if (retryAttempts == 0)
-                break
-        }
-        return consumerRecords
-    }
-
 }
index 8b3fc24..18c7096 100644 (file)
@@ -22,10 +22,11 @@ package org.onap.cps.integration.functional.ncmp.inventory
 
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.api.NcmpResponseStatus
-import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
+import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 
 class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
 
@@ -33,6 +34,12 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
 
     def setup() {
         objectUnderTest = networkCmProxyInventoryFacade
+        subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
+    }
+
+    def cleanup() {
+        kafkaConsumer.unsubscribe()
+        kafkaConsumer.close()
     }
 
     def 'Update of CM-handle with new or unchanged alternate ID succeeds.'() {
@@ -86,4 +93,47 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
             deregisterCmHandle(DMI1_URL, 'ch-1')
     }
 
+    def 'CM Handle registration to verify changes in data producer identifier'() {
+        given: 'DMI will return modules when requested'
+            def cmHandleId = 'ch-id-for-update'
+            dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+
+        when: 'a CM-handle is registered for creation'
+
+            def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId)
+            def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
+            def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
+
+        then: 'registration gives successful response'
+            assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
+
+        then: 'the module sync watchdog is triggered'
+            moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
+
+        and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)'
+            getLatestConsumerRecordsWithMaxPollOf1Second(2)
+
+        and: 'cm handle updated with the data producer identifier'
+            def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
+            def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate])
+            def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate)
+
+        then: 'registration gives successful response'
+            assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
+
+        and: 'get the latest message'
+            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(1)
+
+        and: 'the message has the updated data producer identifier'
+            def notificationMessages = []
+            for (def consumerRecord : consumerRecords) {
+                notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
+            }
+            assert notificationMessages[0].event.cmHandleId.contains(cmHandleId)
+            assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-id'
+
+        cleanup: 'deregister CM handle'
+            deregisterCmHandle(DMI1_URL, cmHandleId)
+    }
+
 }