Merge "Updating the Kafka listener compliance to could events and legacy"
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / kafka / MessagingBaseSpec.groovy
index f7c41ec..0356c3f 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2022 Nordix Foundation.
+ * Copyright (c) 2023 Nordix Foundation.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
 
 package org.onap.cps.ncmp.api.kafka
 
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventSerializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.serialization.StringSerializer
 import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -33,39 +35,41 @@ import spock.lang.Specification
 
 class MessagingBaseSpec extends Specification {
 
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
-    }
-
     def setupSpec() {
         kafkaTestContainer.start()
     }
 
+    def cleanupSpec() {
+        kafkaTestContainer.stop()
+    }
+
     static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
 
-    def producerConfigProperties() {
+    def legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(eventProducerConfigProperties(JsonSerializer)))
+
+    def cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+
+    def eventProducerConfigProperties(valueSerializer) {
         return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('retries')          : 0,
                 ('batch-size')       : 16384,
                 ('linger.ms')        : 1,
                 ('buffer.memory')    : 33554432,
                 ('key.serializer')   : StringSerializer,
-                ('value.serializer') : JsonSerializer]
+                ('value.serializer') : valueSerializer]
     }
 
-    def consumerConfigProperties(consumerGroupId) {
+    def eventConsumerConfigProperties(consumerGroupId, valueSerializer) {
         return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('key.deserializer')  : StringDeserializer,
-                ('value.deserializer'): StringDeserializer,
+                ('value.deserializer'): valueSerializer,
                 ('auto.offset.reset') : 'earliest',
                 ('group.id')          : consumerGroupId
         ]
     }
-
-    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
-
-    @DynamicPropertySource
-    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
-        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
-    }
 }