Merge "Patch # 3: Data operation response event (NCMP → Client App) to comply with...
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / async / NcmpAsyncDataOperationEventConsumerSpec.groovy
index d9b9ce6..7f8469a 100644 (file)
 package org.onap.cps.ncmp.api.impl.async
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.commons.lang3.SerializationUtils
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import io.cloudevents.kafka.impl.KafkaHeaders
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.header.internals.RecordHeaders
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
@@ -45,41 +50,56 @@ import java.time.Duration
 class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
+    NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
     @Autowired
-    RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
+    RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
 
-    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
+    @Autowired
+    ObjectMapper objectMapper
+
+    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
     def static clientTopic = 'client-topic'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
     def 'Consume and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
-            legacyEventKafkaConsumer.subscribe([clientTopic])
+            cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'consumer record for data operation event'
             def consumerRecordIn = createConsumerRecord(dataOperationType)
         when: 'the data operation event is consumed and published to client specified topic'
-            asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
+            objectUnderTest.consumeAndPublish(consumerRecordIn)
         and: 'the client specified topic is polled'
-            def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
-        then: 'verifying consumed event operationID is same as published event operationID'
-            def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
-            def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
-            assert operationIdIn == operationIdOut
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+        then: 'verify cloud compliant headers'
+            def consumerRecordOutHeaders = consumerRecordOut.headers()
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
+        and: 'verify that extension is included into header'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
+        and: 'map consumer record to expected event type'
+            def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
+                    PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+        and: 'verify published response data properties'
+            def response = dataOperationResponseEvent.data.responses[0]
+            response.operationId == 'some-operation-id'
+            response.statusCode == 'any-success-status-code'
+            response.statusMessage == 'Successfully applied changes'
+            response.responseContent as String == '[some-key:some-value]'
     }
 
     def 'Filter an event with type #eventType'() {
         given: 'consumer record for event with type #eventType'
             def consumerRecord = createConsumerRecord(eventType)
         when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
-            def result = recordFilterStrategy.filter(consumerRecord)
+            def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
         then: 'the event is #description'
             assert result == expectedResult
         where: 'filter the event based on the eventType #eventType'
@@ -90,14 +110,27 @@ class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     def createConsumerRecord(eventTypeAsString) {
         def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
-        def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
-        def eventTarget = SerializationUtils.serialize(clientTopic)
-        def eventType = SerializationUtils.serialize(eventTypeAsString)
-        def eventId = SerializationUtils.serialize('12345')
-        def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent)
-        consumerRecord.headers().add(new RecordHeader('eventId', eventId))
-        consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget))
-        consumerRecord.headers().add(new RecordHeader('eventType', eventType))
+        def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
+
+        CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
+
+        def headers = new RecordHeaders()
+        def cloudEventSerializer = new CloudEventSerializer()
+        cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
+
+        def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
+        headers.forEach(header -> consumerRecord.headers().add(header))
         return consumerRecord
     }
+
+    def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
+        return CloudEventBuilder.v1()
+                .withId("some-uuid")
+                .withType(eventTypeAsString)
+                .withSource(URI.create("sample-test-source"))
+                .withData(testEventSentAsBytes)
+                .withExtension("correlationid", "request-id")
+                .withExtension("destination", clientTopic)
+                .build();
+    }
 }