Patch #2 : Introduce kafka template for cloud events
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / kafka / MessagingBaseSpec.groovy
index 337178e..603b8cd 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.onap.cps.ncmp.api.kafka
 
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventSerializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.serialization.StringSerializer
 import org.spockframework.spring.SpringBean
@@ -44,30 +46,33 @@ class MessagingBaseSpec extends Specification {
 
     static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
 
-    def producerConfigProperties() {
+    @SpringBean
+    KafkaTemplate legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(eventProducerConfigProperties(JsonSerializer)))
+
+    @SpringBean
+    KafkaTemplate cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+
+    def eventProducerConfigProperties(valueSerializer) {
         return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('retries')          : 0,
                 ('batch-size')       : 16384,
                 ('linger.ms')        : 1,
                 ('buffer.memory')    : 33554432,
                 ('key.serializer')   : StringSerializer,
-                ('value.serializer') : JsonSerializer]
+                ('value.serializer') : valueSerializer]
     }
 
-    def consumerConfigProperties(consumerGroupId) {
+    def eventConsumerConfigProperties(consumerGroupId, valueSerializer) {
         return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('key.deserializer')  : StringDeserializer,
-                ('value.deserializer'): StringDeserializer,
+                ('value.deserializer'): valueSerializer,
                 ('auto.offset.reset') : 'earliest',
                 ('group.id')          : consumerGroupId
         ]
     }
-
-    @SpringBean
-    KafkaTemplate kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
-
-    @DynamicPropertySource
-    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
-        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
-    }
 }