Verify LCM events during registration 05/139305/12
authormpriyank <priyank.maheshwari@est.tech>
Fri, 25 Oct 2024 08:55:22 +0000 (09:55 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Mon, 4 Nov 2024 13:56:52 +0000 (13:56 +0000)
- verifying the LCM state transition to ADVISED and then to READY state
  during cm handle registration
- enhanced the base kafka test container to be thread safe
- changed the auto offset reset policy for integration test to latest
  (default) from earliest
- added retry mechanism to poll for the records

Issue-ID: CPS-2468
Change-Id: Iabe603e1f5dd985899f04f5ace5d082acef7567a
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java

index 19b10a3..00ce38f 100644 (file)
@@ -20,7 +20,7 @@
 
 package org.onap.cps.integration.functional.ncmp
 
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
@@ -32,7 +32,6 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
 import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory
-import spock.lang.Ignore
 import spock.util.concurrent.PollingConditions
 
 import java.time.Duration
@@ -42,21 +41,23 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
     NetworkCmProxyInventoryFacade objectUnderTest
     def uniqueId = 'ch-unique-id-for-create-test'
 
-    def kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class)
+    static KafkaConsumer kafkaConsumer
 
     def setup() {
         objectUnderTest = networkCmProxyInventoryFacade
+        subscribeAndClearPreviousMessages()
     }
 
-    @Ignore
-    def 'CM Handle registration is successful.'() {
+    def cleanupSpec() {
+        kafkaConsumer.unsubscribe()
+        kafkaConsumer.close()
+    }
+
+    def 'CM Handle registration.'() {
         given: 'DMI will return modules when requested'
             dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
             dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
 
-        and: 'consumer subscribed to topic'
-            kafkaConsumer.subscribe(['ncmp-events'])
-
         when: 'a CM-handle is registered for creation'
             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId)
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
@@ -68,32 +69,33 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
         and: 'CM-handle is initially in ADVISED state'
             assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
 
-        and: 'the module sync watchdog is triggered'
+        then: 'the module sync watchdog is triggered'
             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
 
-        and: 'CM-handle goes to READY state after module sync'
+        then: 'CM-handle goes to READY state after module sync'
             new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
                 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
             })
 
-        and: 'the messages is polled'
-            def message = kafkaConsumer.poll(Duration.ofMillis(10000))
-            def records = message.records(new TopicPartition('ncmp-events', 0))
-
-        and: 'the newest lcm event notification is received with READY state'
-            def notificationMessage = jsonObjectMapper.convertJsonString(records.last().value().toString(), LcmEvent)
-            /*TODO (Toine) This test was failing intermittently (when running as part of suite).
-                I suspect that it often gave false positives as the message being assert here was any random message created by previous tests
-                By checking the cm-handle and using an unique cm-handle in this test this flaw became obvious.
-                I have now ignored this test as it is out of scope of this commit to fix it.
-                Created: https://lf-onap.atlassian.net/browse/CPS-2468 to fix this instead
-             */
-            assert notificationMessage.event.cmHandleId == uniqueId
-            assert notificationMessage.event.newValues.cmHandleState.value() == 'READY'
-
         and: 'the CM-handle has expected modules'
             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
 
+        then: 'get the latest messages'
+            def consumerRecords = getLatestConsumerRecords()
+
+        and: 'both converted messages are for the correct cm handle'
+            def notificationMessages = []
+            for (def consumerRecord : consumerRecords) {
+                notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
+            }
+            assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ]
+
+        and: 'the oldest event is about the update to ADVISED state'
+            notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
+
+        and: 'the next event is about update to READY state'
+            notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
+
         cleanup: 'deregister CM handle'
             deregisterCmHandle(DMI1_URL, uniqueId)
     }
@@ -224,4 +226,23 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
         cleanup: 'deregister CM handles'
             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
     }
+
+    def subscribeAndClearPreviousMessages() {
+        kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class)
+        kafkaConsumer.subscribe(['ncmp-events'])
+        kafkaConsumer.poll(Duration.ofMillis(500))
+    }
+
+    def getLatestConsumerRecords() {
+        def consumerRecords = []
+        def retryAttempts = 10
+        while (consumerRecords.size() < 2) {
+            retryAttempts--
+            consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
+            if (retryAttempts == 0)
+                break
+        }
+        consumerRecords
+    }
+
 }
index d41f752..ff4aec4 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.cps.integration;
 
 import java.util.HashMap;
 import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -33,11 +34,12 @@ import org.testcontainers.utility.DockerImageName;
  * This ensures only one instance of Kafka container across the integration tests.
  * Avoid unnecessary resource and time consumption.
  */
+@Slf4j
 public class KafkaTestContainer extends KafkaContainer {
 
     private static final String IMAGE_NAME_AND_VERSION = "registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1";
 
-    private static KafkaTestContainer kafkaTestContainer;
+    private static volatile KafkaTestContainer kafkaTestContainer;
 
     private KafkaTestContainer() {
         super(DockerImageName.parse(IMAGE_NAME_AND_VERSION).asCompatibleSubstituteFor("confluentinc/cp-kafka"));
@@ -51,8 +53,15 @@ public class KafkaTestContainer extends KafkaContainer {
      */
     public static KafkaTestContainer getInstance() {
         if (kafkaTestContainer == null) {
-            kafkaTestContainer = new KafkaTestContainer();
-            Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::close));
+            synchronized (KafkaTestContainer.class) {
+                if (kafkaTestContainer == null) {
+                    kafkaTestContainer = new KafkaTestContainer();
+                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                        log.info("Shutting down KafkaTestContainer...");
+                        kafkaTestContainer.stop();
+                    }));
+                }
+            }
         }
         return kafkaTestContainer;
     }
@@ -63,8 +72,11 @@ public class KafkaTestContainer extends KafkaContainer {
 
     @Override
     public void start() {
-        super.start();
-        System.setProperty("spring.kafka.properties.bootstrap.servers", kafkaTestContainer.getBootstrapServers());
+        if (!isRunning()) {
+            super.start();
+            System.setProperty("spring.kafka.properties.bootstrap.servers", getBootstrapServers());
+            log.info("KafkaTestContainer started at {}", getBootstrapServers());
+        }
     }
 
     @Override
@@ -78,8 +90,9 @@ public class KafkaTestContainer extends KafkaContainer {
         configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers());
         configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
-        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
         configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
+        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE);
         return configProps;
     }