Test scenarios for async infra code 22/128722/4
authormpriyank <priyank.maheshwari@est.tech>
Tue, 19 Apr 2022 12:44:45 +0000 (13:44 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Mon, 25 Apr 2022 11:12:14 +0000 (12:12 +0100)
Issue-ID: CPS-987
Change-Id: Ie4c40b91e45ad325658a32eb474840af761e0029
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
src/test/resources/application.yml

index 4fc697e..f5bc4ac 100644 (file)
@@ -35,5 +35,7 @@ class NcmpKafkaPublisherServiceSpec extends Specification {
             objectUnderTest.publishToNcmp(messageKey, message)
         then: 'no exception is thrown'
             noExceptionThrown()
+        and: 'message is published once'
+            1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message)
     }
 }
index 54f3502..00c8e6e 100644 (file)
 
 package org.onap.cps.ncmp.dmi.service
 
-
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.beans.factory.annotation.Value
 import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
 import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.DynamicPropertyRegistry
 import org.springframework.test.context.DynamicPropertySource
 import org.testcontainers.containers.KafkaContainer
 import org.testcontainers.spock.Testcontainers
 import spock.lang.Specification
 
+import java.time.Duration
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
+
 @SpringBootTest
 @Testcontainers
+@DirtiesContext
 class NcmpKafkaPublisherSpec extends Specification {
 
     static kafkaTestContainer = new KafkaContainer()
@@ -50,16 +65,31 @@ class NcmpKafkaPublisherSpec extends Specification {
     @Value('${app.ncmp.async-m2m.topic}')
     String topic
 
-    def 'Publish message'() {
+    KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConsumerConfig())
+
+    def 'Publish and Subscribe message'() {
         given: 'a sample messsage and key'
             def message = 'sample message'
             def messageKey = 'message-key'
             def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic)
         when: 'a message is published'
             objectUnderTest.sendMessage(messageKey, message)
-        then: 'no exception is thrown'
-            noExceptionThrown()
+        then: 'a message is consumed'
+            consumer.subscribe([topic] as List<String>)
+            def records = consumer.poll(Duration.ofMillis(1000))
+            assert records.size() == 1
+            assert messageKey == records[0].key
+            assert message == records[0].value
+    }
 
+    def kafkaConsumerConfig() {
+        def configs = [:]
+        configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name)
+        configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name)
+        configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest')
+        configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0])
+        configs.put(GROUP_ID_CONFIG, 'test')
+        return configs
     }
 
     @DynamicPropertySource
@@ -67,3 +97,11 @@ class NcmpKafkaPublisherSpec extends Specification {
         registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
     }
 }
+
+@Configuration
+class TopicConfig {
+    @Bean
+    NewTopic newTopic() {
+        return new NewTopic("my-topic-name", 1, (short) 1);
+    }
+}
index 344743b..afaaa4d 100644 (file)
@@ -57,5 +57,5 @@ spring:
 app:
   ncmp:
     async-m2m:
-      topic: ncmp-async-m2m
+      topic: my-topic-name