Patch #2 : Introduce kafka template for cloud events
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / async / NcmpAsyncBatchEventConsumerSpec.groovy
index 28464bb..02071cd 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.SerializationUtils
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1
@@ -46,7 +47,7 @@ import java.time.Duration
 class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(kafkaTemplate)
+    EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
     NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher)
@@ -57,19 +58,19 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
     @Autowired
     RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
     def static clientTopic = 'client-topic'
     def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1'
 
     def 'Consume and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
-            kafkaConsumer.subscribe([clientTopic])
+            legacyEventKafkaConsumer.subscribe([clientTopic])
         and: 'consumer record for batch event'
             def consumerRecordIn = createConsumerRecord(batchEventType)
         when: 'the batch event is consumed and published to client specified topic'
             asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn)
         and: 'the client specified topic is polled'
-            def consumerRecordOut = kafkaConsumer.poll(Duration.ofMillis(1500))[0]
+            def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
         then: 'verifying consumed event operationID is same as published event operationID'
             def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId
             def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId