Patch # 3: Data operation response event (NCMP → Client App) to comply with CloudEvents 04/135004/5
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Mon, 19 Jun 2023 16:35:15 +0000 (17:35 +0100)
committersourabh_sourabh <sourabh.sourabh@est.tech>
Wed, 21 Jun 2023 12:34:43 +0000 (13:34 +0100)
- Modified data operation record strategy to consume cloud event.
- Modified NCMP data operation event consumer to read cloud event
  header. (prefixed with ce_)
- Modified event publisher to support legacy and cloud event based on
  event type (if legacy event use legacy kafka template else cloud kafka
  template).
- Introduced a new method onto json object mapper to convert json object
  to bytes.
- Modified data operation consumer spec to produce a cloud event and validate it.
- Added Kafka Integration Test (for filtering)

Issue-ID: CPS-1724

Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: Ide701b1ff952f57413cb4e4aa0d55c08753f0298
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
cps-ncmp-service/pom.xml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
cps-ncmp-service/src/test/resources/application.yml
cps-ncmp-service/src/test/resources/dataOperationEvent.json

index 19ef988..608141e 100644 (file)
             <artifactId>spock-spring</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>kafka</artifactId>
index 9e2b66a..ce666b1 100644 (file)
@@ -20,9 +20,8 @@
 
 package org.onap.cps.ncmp.api.impl.async;
 
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -41,15 +40,11 @@ public class DataOperationRecordFilterStrategy {
      * @return boolean value.
      */
     @Bean
-    public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
+    public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
         return consumedRecord -> {
-            final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
-            if (eventTypeHeader == null) {
-                return false;
-            }
-            final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
-            return !(eventTypeHeaderValue != null
-                    && eventTypeHeaderValue.contains("DataOperationEvent"));
+            final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
+                    consumedRecord.headers(), "ce_type");
+            return !(eventTypeHeaderValue.contains("DataOperationEvent"));
         };
     }
 }
index 995a4d5..4a0ec5c 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.async;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -39,7 +39,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class NcmpAsyncDataOperationEventConsumer {
 
-    private final EventsPublisher<DataOperationEvent> eventsPublisher;
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
      * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
@@ -52,14 +52,12 @@ public class NcmpAsyncDataOperationEventConsumer {
             filter = "includeDataOperationEventsOnly",
             groupId = "ncmp-data-operation-event-group",
             properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
-    public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
-                                              dataOperationEventConsumerRecord) {
+    public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
         log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
-        final String eventTarget = SerializationUtils
-                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
-        final String eventId = SerializationUtils
-                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
-        eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
-                dataOperationEventConsumerRecord.value());
+        final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
+                dataOperationEventConsumerRecord.headers(), "ce_destination");
+        final String eventId = KafkaHeaders.getParsedKafkaHeader(
+                dataOperationEventConsumerRecord.headers(), "ce_id");
+        eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
     }
 }
index e61e772..05c731d 100644 (file)
@@ -71,7 +71,7 @@ public class EventsPublisher<T> {
      * @param event     message payload
      * @deprecated This method is not needed anymore since the use of headers will be in place.
      */
-    @Deprecated
+    @Deprecated(forRemoval = true)
     public void publishEvent(final String topicName, final String eventKey, final T event) {
         final ListenableFuture<SendResult<String, T>> eventFuture
                 = legacyKafkaEventTemplate.send(topicName, eventKey, event);
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
new file mode 100644 (file)
index 0000000..3db8520
--- /dev/null
@@ -0,0 +1,61 @@
+package org.onap.cps.ncmp.api.impl.async
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.kafka.CloudEventSerializer
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry
+import org.springframework.kafka.test.utils.ContainerTestUtils
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+import java.util.concurrent.TimeUnit
+
+@SpringBootTest(classes =[NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy])
+@DirtiesContext
+@Testcontainers
+@EnableAutoConfiguration
+class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec {
+
+    @Autowired
+    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
+
+    @SpringBean
+    EventsPublisher mockEventsPublisher = Mock()
+
+    def activateListeners() {
+        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+                messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+        )
+    }
+
+    def 'Filtering Events.'() {
+        given: 'a cloud event of type: #eventType'
+            def cloudEvent = CloudEventBuilder.v1().withId("some-uuid")
+                    .withType(eventType)
+                    .withSource(URI.create("sample-test-source"))
+                    .build();
+        and: 'activate message listener container'
+            activateListeners()
+        when: 'send the cloud event'
+            ProducerRecord<String, CloudEvent> record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent)
+            KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
+            producer.send(record);
+        and: 'wait a little for async processing of message'
+            TimeUnit.MILLISECONDS.sleep(100)
+        then: 'the event has only been forwarded for the correct type'
+            expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _)
+        where:
+            eventType                                        || expectedNUmberOfCallsToPublishForwardedEvent
+            'DataOperationEvent'                             || 1
+            'other type'                                     || 0
+            'any type contain the word "DataOperationEvent"' || 1
+    }
+}
+
index d9b9ce6..7f8469a 100644 (file)
 package org.onap.cps.ncmp.api.impl.async
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.commons.lang3.SerializationUtils
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import io.cloudevents.kafka.impl.KafkaHeaders
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.header.internals.RecordHeaders
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
@@ -45,41 +50,56 @@ import java.time.Duration
 class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
+    NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
     @Autowired
-    RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
+    RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
 
-    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
+    @Autowired
+    ObjectMapper objectMapper
+
+    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
     def static clientTopic = 'client-topic'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
     def 'Consume and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
-            legacyEventKafkaConsumer.subscribe([clientTopic])
+            cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'consumer record for data operation event'
             def consumerRecordIn = createConsumerRecord(dataOperationType)
         when: 'the data operation event is consumed and published to client specified topic'
-            asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
+            objectUnderTest.consumeAndPublish(consumerRecordIn)
         and: 'the client specified topic is polled'
-            def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
-        then: 'verifying consumed event operationID is same as published event operationID'
-            def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
-            def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
-            assert operationIdIn == operationIdOut
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+        then: 'verify cloud compliant headers'
+            def consumerRecordOutHeaders = consumerRecordOut.headers()
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
+        and: 'verify that extension is included into header'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
+        and: 'map consumer record to expected event type'
+            def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
+                    PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+        and: 'verify published response data properties'
+            def response = dataOperationResponseEvent.data.responses[0]
+            response.operationId == 'some-operation-id'
+            response.statusCode == 'any-success-status-code'
+            response.statusMessage == 'Successfully applied changes'
+            response.responseContent as String == '[some-key:some-value]'
     }
 
     def 'Filter an event with type #eventType'() {
         given: 'consumer record for event with type #eventType'
             def consumerRecord = createConsumerRecord(eventType)
         when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
-            def result = recordFilterStrategy.filter(consumerRecord)
+            def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
         then: 'the event is #description'
             assert result == expectedResult
         where: 'filter the event based on the eventType #eventType'
@@ -90,14 +110,27 @@ class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     def createConsumerRecord(eventTypeAsString) {
         def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
-        def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
-        def eventTarget = SerializationUtils.serialize(clientTopic)
-        def eventType = SerializationUtils.serialize(eventTypeAsString)
-        def eventId = SerializationUtils.serialize('12345')
-        def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent)
-        consumerRecord.headers().add(new RecordHeader('eventId', eventId))
-        consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget))
-        consumerRecord.headers().add(new RecordHeader('eventType', eventType))
+        def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
+
+        CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
+
+        def headers = new RecordHeaders()
+        def cloudEventSerializer = new CloudEventSerializer()
+        cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
+
+        def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
+        headers.forEach(header -> consumerRecord.headers().add(header))
         return consumerRecord
     }
+
+    def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
+        return CloudEventBuilder.v1()
+                .withId("some-uuid")
+                .withType(eventTypeAsString)
+                .withSource(URI.create("sample-test-source"))
+                .withData(testEventSentAsBytes)
+                .withExtension("correlationid", "request-id")
+                .withExtension("destination", clientTopic)
+                .build();
+    }
 }
index 4a9e3ee..5cc70e2 100644 (file)
@@ -39,7 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
-
 import java.time.Duration
 
 @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -85,8 +84,8 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
             assert records.size() == 1
         and: 'record can be converted to AVC event'
             def record = records.iterator().next()
-            def cloudevent = record.value() as CloudEvent
-            def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+            def cloudEvent = record.value() as CloudEvent
+            def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue()
         and: 'we have correct headers forwarded where correlation id matches'
             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
         and: 'event id differs(as per requirement) between consumed and forwarded'
index 3ae6348..7bdf335 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.inventory
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import spock.lang.Specification
-
 import java.time.OffsetDateTime
 import java.time.ZoneOffset
 import java.time.format.DateTimeFormatter
index 197bfda..df34f84 100644 (file)
@@ -26,6 +26,8 @@ spring:
 
 app:
     ncmp:
+        async-m2m:
+            topic: ncmp-async-m2m
         avc:
             subscription-topic: cm-avc-subscription
             cm-events-topic: cm-events
index 42268c0..0a32f38 100644 (file)
@@ -1,30 +1,15 @@
 {
-  "data":{
-    "responses":[
+  "data": {
+    "responses": [
       {
-        "operationId":"1",
-        "ids":[
-          "123",
-          "124"
+        "operationId": "some-operation-id",
+        "ids": [
+          "cm-handle-id"
         ],
-        "statusCode":1,
-        "statusMessage":"Batch operation success on the above cmhandle ids ",
-        "responseContent":{
-          "ietf-netconf-monitoring:netconf-state":{
-            "schemas":{
-              "schema":[
-                {
-                  "identifier":"ietf-tls-server",
-                  "version":"2016-11-02",
-                  "format":"ietf-netconf-monitoring:yang",
-                  "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server",
-                  "location":[
-                    "NETCONF"
-                  ]
-                }
-              ]
-            }
-          }
+        "statusCode": "any-success-status-code",
+        "statusMessage": "Successfully applied changes",
+        "responseContent": {
+          "some-key": "some-value"
         }
       }
     ]