Add kafka messaging support to integration test module 96/137496/11
authorhalil.cakal <halil.cakal@est.tech>
Mon, 11 Mar 2024 14:19:36 +0000 (14:19 +0000)
committerhalil.cakal <halil.cakal@est.tech>
Tue, 19 Mar 2024 16:05:17 +0000 (16:05 +0000)
- add ncmp test using kafka to integration test suite

Issue-ID: CPS-2152

Change-Id: Ia486bbcf1590ecf3ec6cbc2f513b74d55e4d6a31
Signed-off-by: halil.cakal <halil.cakal@est.tech>
integration-test/pom.xml
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmHandleCreateSpec.groovy
integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java [new file with mode: 0644]

index b379e9f..c845132 100644 (file)
             <artifactId>spock</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-web</artifactId>
index 33945a6..5020dce 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.cps.api.CpsDataspaceService
 import org.onap.cps.api.CpsModuleService
 import org.onap.cps.api.CpsQueryService
 import org.onap.cps.integration.DatabaseTestContainer
+import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.ncmp.api.NetworkCmProxyCmHandleQueryService
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService
 import org.onap.cps.ncmp.api.NetworkCmProxyQueryService
@@ -59,9 +60,9 @@ import java.time.format.DateTimeFormatter
 
 import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
 import static org.springframework.test.web.client.response.MockRestResponseCreators.withStatus
-import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME;
-import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR;
-import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT;
+import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME
+import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
+import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT
 
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK, classes = [CpsDataspaceService])
 @Testcontainers
@@ -75,6 +76,9 @@ abstract class CpsIntegrationSpecBase extends Specification {
     @Shared
     DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance()
 
+    @Shared
+    KafkaTestContainer kafkaTestContainer = KafkaTestContainer.getInstance();
+
     @Autowired
     MockMvc mvc;
 
index 6b6f62e..d20ac33 100644 (file)
 
 package org.onap.cps.integration.functional
 
-import java.time.OffsetDateTime
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService
 import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -30,10 +33,15 @@ import org.onap.cps.ncmp.api.models.DmiPluginRegistration
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
 import spock.util.concurrent.PollingConditions
 
+import java.time.Duration
+import java.time.OffsetDateTime
+
 class NcmpCmHandleCreateSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyDataService objectUnderTest
 
+    def kafkaConsumer = KafkaTestContainer.getConsumer("ncmp-group", StringDeserializer.class);
+
     static final MODULE_REFERENCES_RESPONSE_A = readResourceDataFile('mock-dmi-responses/bookStoreAWithModules_M1_M2_Response.json')
     static final MODULE_RESOURCES_RESPONSE_A = readResourceDataFile('mock-dmi-responses/bookStoreAWithModules_M1_M2_ResourcesResponse.json')
     static final MODULE_REFERENCES_RESPONSE_B = readResourceDataFile('mock-dmi-responses/bookStoreBWithModules_M1_M3_Response.json')
@@ -47,6 +55,9 @@ class NcmpCmHandleCreateSpec extends CpsIntegrationSpecBase {
         given: 'DMI will return modules when requested'
             mockDmiResponsesForModuleSync(DMI_URL, 'ch-1', MODULE_REFERENCES_RESPONSE_A, MODULE_RESOURCES_RESPONSE_A)
 
+        and: 'consumer subscribed to topic'
+            kafkaConsumer.subscribe(['ncmp-events'])
+
         when: 'a CM-handle is registered for creation'
             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI_URL, createdCmHandles: [cmHandleToCreate])
@@ -66,6 +77,13 @@ class NcmpCmHandleCreateSpec extends CpsIntegrationSpecBase {
                 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState
             })
 
+        and: 'the messages is polled'
+            def message = kafkaConsumer.poll(Duration.ofMillis(10000))
+            def records = message.records(new TopicPartition('ncmp-events', 0))
+
+        and: 'the newest lcm event notification is received with READY state'
+            assert records.last().value().toString().contains('"cmHandleState":"READY"')
+
         and: 'the CM-handle has expected modules'
             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences('ch-1').moduleName.sort()
 
diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java
new file mode 100644 (file)
index 0000000..d41f752
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * The Apache Kafka test container wrapper.
+ * Allow to use specific image and version with Singleton design pattern.
+ * This ensures only one instance of Kafka container across the integration tests.
+ * Avoid unnecessary resource and time consumption.
+ */
+public class KafkaTestContainer extends KafkaContainer {
+
+    private static final String IMAGE_NAME_AND_VERSION = "registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1";
+
+    private static KafkaTestContainer kafkaTestContainer;
+
+    private KafkaTestContainer() {
+        super(DockerImageName.parse(IMAGE_NAME_AND_VERSION).asCompatibleSubstituteFor("confluentinc/cp-kafka"));
+    }
+
+    /**
+     * Provides an instance of Kafka test container wrapper.
+     * This will allow to initialize Kafka messaging support before any integration test run.
+     *
+     * @return KafkaTestContainer the unique Kafka instance
+     */
+    public static KafkaTestContainer getInstance() {
+        if (kafkaTestContainer == null) {
+            kafkaTestContainer = new KafkaTestContainer();
+            Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::close));
+        }
+        return kafkaTestContainer;
+    }
+
+    public static KafkaConsumer getConsumer(final String consumerGroupId, final Object valueDeserializer) {
+        return new KafkaConsumer<>(consumerProperties(consumerGroupId, valueDeserializer));
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        System.setProperty("spring.kafka.properties.bootstrap.servers", kafkaTestContainer.getBootstrapServers());
+    }
+
+    @Override
+    public void stop() {
+        // Method intentionally left blank
+    }
+
+    private static Map<String, Object> consumerProperties(final String consumerGroupId,
+                                                          final Object valueDeserializer) {
+        final Map<String, Object> configProps = new HashMap<>();
+        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers());
+        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
+        return configProps;
+    }
+
+}