Patch #2 : Introduce kafka template for cloud events
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumerSpec.groovy
index 5f54bbe..3dffac7 100644 (file)
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -48,7 +49,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
     AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
 
     @SpringBean
-    EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(kafkaTemplate)
+    EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
     AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper)
@@ -56,13 +57,13 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
 
     def 'Consume and forward valid message'() {
         given: 'consumer has a subscription on a topic'
             def cmEventsTopicName = 'cm-events'
             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
-            kafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
+            legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
         and: 'an event is sent'
             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
@@ -73,7 +74,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
         when: 'the event is consumed'
             acvEventConsumer.consumeAndForward(consumerRecord)
         and: 'the topic is polled'
-            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+            def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record can be converted to AVC event'