Code review cleanup for: Add kafka messaging support to integration test module
[cps.git] / integration-test / src / test / groovy / org / onap / cps / integration / functional / NcmpCmHandleCreateSpec.groovy
index 6b6f62e..f03872d 100644 (file)
 
 package org.onap.cps.integration.functional
 
+import java.time.Duration
 import java.time.OffsetDateTime
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService
 import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -28,12 +32,15 @@ import org.onap.cps.ncmp.api.impl.inventory.LockReasonCategory
 import org.onap.cps.ncmp.api.models.CmHandleRegistrationResponse
 import org.onap.cps.ncmp.api.models.DmiPluginRegistration
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import spock.util.concurrent.PollingConditions
 
 class NcmpCmHandleCreateSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyDataService objectUnderTest
 
+    def kafkaConsumer = KafkaTestContainer.getConsumer('ncmp-group', StringDeserializer.class)
+
     static final MODULE_REFERENCES_RESPONSE_A = readResourceDataFile('mock-dmi-responses/bookStoreAWithModules_M1_M2_Response.json')
     static final MODULE_RESOURCES_RESPONSE_A = readResourceDataFile('mock-dmi-responses/bookStoreAWithModules_M1_M2_ResourcesResponse.json')
     static final MODULE_REFERENCES_RESPONSE_B = readResourceDataFile('mock-dmi-responses/bookStoreBWithModules_M1_M3_Response.json')
@@ -47,6 +54,9 @@ class NcmpCmHandleCreateSpec extends CpsIntegrationSpecBase {
         given: 'DMI will return modules when requested'
             mockDmiResponsesForModuleSync(DMI_URL, 'ch-1', MODULE_REFERENCES_RESPONSE_A, MODULE_RESOURCES_RESPONSE_A)
 
+        and: 'consumer subscribed to topic'
+            kafkaConsumer.subscribe(['ncmp-events'])
+
         when: 'a CM-handle is registered for creation'
             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI_URL, createdCmHandles: [cmHandleToCreate])
@@ -66,6 +76,14 @@ class NcmpCmHandleCreateSpec extends CpsIntegrationSpecBase {
                 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState
             })
 
+        and: 'the messages is polled'
+            def message = kafkaConsumer.poll(Duration.ofMillis(10000))
+            def records = message.records(new TopicPartition('ncmp-events', 0))
+
+        and: 'the newest lcm event notification is received with READY state'
+            def notificationMessage = jsonObjectMapper.convertJsonString(records.last().value().toString(), LcmEvent)
+            assert notificationMessage.event.newValues.cmHandleState.value() == 'READY'
+
         and: 'the CM-handle has expected modules'
             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences('ch-1').moduleName.sort()