Updating the Kafka listener compliance to could events and legacy
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / events / lcm / LcmEventsPublisherSpec.groovy
index 7c9464d..223c92f 100644 (file)
@@ -22,31 +22,33 @@ package org.onap.cps.ncmp.api.impl.events.lcm
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.events.lcm.v1.Event
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
-import org.onap.ncmp.cmhandle.event.lcm.Event
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
+import org.springframework.util.SerializationUtils
 import org.testcontainers.spock.Testcontainers
 
 import java.time.Duration
 
-@SpringBootTest(classes = [EventsPublisher, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
 class LcmEventsPublisherSpec extends MessagingBaseSpec {
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
 
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(kafkaTemplate)
+    EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
@@ -55,21 +57,37 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
     def 'Produce and Consume Lcm Event'() {
         given: 'event key and event data'
             def eventKey = 'lcm'
+            def eventId = 'test-uuid'
+            def eventCorrelationId = 'cmhandle-test'
+            def eventSource = 'org.onap.ncmp'
+            def eventTime = '2022-12-31T20:30:40.000+0000'
+            def eventType = 'org.onap.ncmp.cmhandle.lcm.event'
+            def eventSchema = 'org.onap.ncmp.cmhandle.lcm.event'
+            def eventSchemaVersion = 'v1'
             def eventData = new LcmEvent(
-                eventId: 'test-uuid',
-                eventCorrelationId: 'cmhandle-as-correlationid',
-                eventSource: 'org.onap.ncmp',
-                eventTime: '2022-12-31T20:30:40.000+0000',
-                eventType: 'org.onap.ncmp.cmhandle.lcm.event',
-                eventSchema: 'org.onap.ncmp.cmhandle.lcm.event',
-                eventSchemaVersion: 'v1',
+                eventId: eventId,
+                eventCorrelationId: eventCorrelationId,
+                eventSource: eventSource,
+                eventTime: eventTime,
+                eventType: eventType,
+                eventSchema: eventSchema,
+                eventSchemaVersion: eventSchemaVersion,
                 event: new Event(cmHandleId: 'cmhandle-test'))
+        and: 'we have a event header'
+            def eventHeader = [
+                eventId           : eventId,
+                eventCorrelationId: eventCorrelationId,
+                eventSource       : eventSource,
+                eventTime         : eventTime,
+                eventType         : eventType,
+                eventSchema       : eventSchema,
+                eventSchemaVersion: eventSchemaVersion]
         and: 'consumer has a subscription'
-            kafkaConsumer.subscribe([testTopic] as List<String>)
+            legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
         when: 'an event is published'
-            lcmEventsPublisher.publishEvent(testTopic, eventKey, eventData)
+            lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
         and: 'topic is polled'
-            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+            def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record key matches the expected event key'
@@ -79,5 +97,8 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
             def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json')
             def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class)
             assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class)
+        and: 'record header matches the expected parameters'
+            assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId
+            assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
     }
 }