Patch # 1: Data operation response event (NCMP → Client App) to comply with CloudEvents 92/134792/14
authorsourabh_sourabh <sourabh.sourabh@est.tech>
Thu, 8 Jun 2023 14:25:33 +0000 (15:25 +0100)
committersourabh_sourabh <sourabh.sourabh@est.tech>
Mon, 19 Jun 2023 14:05:31 +0000 (15:05 +0100)
 - Removed header definitions (since we now use CloudEvents)
 - Used 'dataOperation' instead of batch where appropriate.
 - Modified test json

Issue-ID: CPS-1724
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: Ic0f65297b944adf9cf5f3c2cbec679a031a675ec
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json [deleted file]
cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json [moved from cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json with 73% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java with 82% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java with 60% similarity]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy with 72% similarity]
cps-ncmp-service/src/test/resources/dataOperationEvent.json [moved from cps-ncmp-service/src/test/resources/batchDataEvent.json with 57% similarity]

diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json
deleted file mode 100644 (file)
index bbcadcd..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-{
-  "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-headers:1.0.0",
-  "$ref": "#/definitions/BatchEventHeaders",
-  "definitions": {
-    "BatchEventHeaders": {
-      "description": "The header information of the Batch event.",
-      "type": "object",
-      "javaType" : "org.onap.cps.ncmp.events.async.BatchEventHeadersV1",
-      "properties": {
-        "eventId": {
-          "description": "The unique id for identifying the event.",
-          "type": "string"
-        },
-        "eventCorrelationId": {
-          "description": "The request id received by NCMP as an acknowledgement.",
-          "type": "string"
-        },
-        "eventTime": {
-          "description": "The time of the event. It should be in RFC format ('yyyy-MM-dd'T'HH:mm:ss.SSSZ').",
-          "type": "string"
-        },
-        "eventTarget": {
-          "description": "The destination topic to forward the consumed event.",
-          "type": "string"
-        },
-        "eventSource": {
-          "description": "The source of the event.",
-          "type": "string"
-        },
-        "eventType": {
-          "description": "The type of the Batch event.",
-          "type": "string"
-        },
-        "eventSchema": {
-          "description": "The schema of the Batch event payload.",
-          "type": "string"
-        },
-        "eventSchemaVersion": {
-          "description": "The schema version of the Batch event payload.",
-          "type": "string"
-        }
-      },
-      "required": [
-        "eventId",
-        "eventCorrelationId",
-        "eventTarget",
-        "eventType",
-        "eventSchema",
-        "eventSchemaVersion"
-      ],
-      "additionalProperties": false
-    }
-  }
-}
\ No newline at end of file
@@ -1,19 +1,18 @@
 {
   "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-schema:1.0.0",
-  "$ref": "#/definitions/BatchDataResponseEvent",
+  "$id": "urn:cps:org.onap.cps.ncmp.events.async:data-operation-event-schema:1.0.0",
+  "$ref": "#/definitions/DataOperationEvent",
   "definitions": {
-    "BatchDataResponseEvent": {
-      "description": "The payload of batch event.",
+    "DataOperationEvent": {
+      "description": "The payload of data operation event.",
       "type": "object",
-      "javaType" : "org.onap.cps.ncmp.events.async.BatchDataResponseEventV1",
+      "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent",
       "properties": {
-        "event": {
+        "data": {
           "description": "The payload content of the requested data.",
           "type": "object",
-          "javaType" : "org.onap.cps.ncmp.events.async.BatchDataEvent",
           "properties": {
-            "batch-responses": {
+            "responses": {
               "description": "An array of batch responses which contains both success and failure",
               "type": "array",
               "items": {
                     "description": "Id's of the cmhandles",
                     "type": "array"
                   },
-                  "status-code": {
+                  "statusCode": {
                     "description": "which says success or failure (0-99) are for success and (100-199) are for failure",
                     "type": "string"
                   },
-                  "status-message": {
+                  "statusMessage": {
                     "description": "Human readable message, Which says what the response has",
                     "type": "string"
                   },
-                  "data": {
+                  "responseContent": {
                     "description": "Contains the requested data response.",
                     "type": "object",
                     "existingJavaType": "java.lang.Object",
                 "required": [
                   "operationId",
                   "ids",
-                  "status-code",
-                  "status-message"
+                  "statusCode",
+                  "statusMessage"
                 ],
                 "additionalProperties": false
               }
             }
           },
           "required": [
-            "batch-responses"
+            "responses"
           ],
           "additionalProperties": false
         }
       },
       "required": [
-        "event"
+        "data"
       ],
       "additionalProperties": false
     }
@@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
 
 /**
- * Batch Record filter strategy, which helps to filter the consumer records.
+ * Data operation record filter strategy, which helps to filter the consumer records.
  *
  */
 @Configuration
-public class BatchRecordFilterStrategy {
+public class DataOperationRecordFilterStrategy {
 
     /**
      *  Filtering the consumer records based on the eventType header, It
@@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy {
      * @return boolean value.
      */
     @Bean
-    public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
+    public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
         return consumedRecord -> {
             final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
             if (eventTypeHeader == null) {
@@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy {
             }
             final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
             return !(eventTypeHeaderValue != null
-                    && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+                    && eventTypeHeaderValue.contains("DataOperationEvent"));
         };
     }
 }
@@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
 /**
- * Listener for cps-ncmp async batch events.
+ * Listener for cps-ncmp async data operation events.
  */
 @Component
 @Slf4j
 @RequiredArgsConstructor
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class NcmpAsyncBatchEventConsumer {
+public class NcmpAsyncDataOperationEventConsumer {
 
-    private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher;
+    private final EventsPublisher<DataOperationEvent> eventsPublisher;
 
     /**
-     * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic'
+     * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
      * and publish the same to the client specified topic.
      *
-     * @param batchEventConsumerRecord consuming event as a ConsumerRecord.
+     * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
      */
     @KafkaListener(
             topics = "${app.ncmp.async-m2m.topic}",
-            filter = "filterBatchDataResponseEvent",
-            groupId = "ncmp-batch-event-group",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"})
-    public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) {
-        log.info("Consuming event payload {} ...", batchEventConsumerRecord.value());
+            filter = "includeDataOperationEventsOnly",
+            groupId = "ncmp-data-operation-event-group",
+            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
+    public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
+                                              dataOperationEventConsumerRecord) {
+        log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
         final String eventTarget = SerializationUtils
-                .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value());
+                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
         final String eventId = SerializationUtils
-                .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value());
-        eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(),
-                batchEventConsumerRecord.value());
+                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
+        eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
+                dataOperationEventConsumerRecord.value());
     }
 }
@@ -28,7 +28,7 @@ import org.apache.kafka.common.header.internals.RecordHeader
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
@@ -37,43 +37,41 @@ import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
-
 import java.time.Duration
 
-@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper,
-                ObjectMapper])
+@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper])
 @Testcontainers
 @DirtiesContext
-class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
+class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher)
+    NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
     @Autowired
-    RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
+    RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
 
     def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
     def static clientTopic = 'client-topic'
-    def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1'
+    def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
     def 'Consume and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
             legacyEventKafkaConsumer.subscribe([clientTopic])
-        and: 'consumer record for batch event'
-            def consumerRecordIn = createConsumerRecord(batchEventType)
-        when: 'the batch event is consumed and published to client specified topic'
-            asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn)
+        and: 'consumer record for data operation event'
+            def consumerRecordIn = createConsumerRecord(dataOperationType)
+        when: 'the data operation event is consumed and published to client specified topic'
+            asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
         and: 'the client specified topic is polled'
             def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
         then: 'verifying consumed event operationID is same as published event operationID'
-            def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId
-            def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId
+            def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
+            def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
             assert operationIdIn == operationIdOut
     }
 
@@ -85,14 +83,14 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
         then: 'the event is #description'
             assert result == expectedResult
         where: 'filter the event based on the eventType #eventType'
-            description                                     | eventType       || expectedResult
-            'not filtered(the consumer will see the event)' | batchEventType  || false
-            'filtered(the consumer will not see the event)' | 'wrongType'     || true
+            description                                     | eventType         || expectedResult
+            'not filtered(the consumer will see the event)' | dataOperationType || false
+            'filtered(the consumer will not see the event)' | 'wrongType'       || true
     }
 
     def createConsumerRecord(eventTypeAsString) {
-        def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json')
-        def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class)
+        def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
+        def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
         def eventTarget = SerializationUtils.serialize(clientTopic)
         def eventType = SerializationUtils.serialize(eventTypeAsString)
         def eventId = SerializationUtils.serialize('12345')
@@ -1,15 +1,15 @@
 {
-  "event":{
-    "batch-responses":[
+  "data":{
+    "responses":[
       {
         "operationId":"1",
         "ids":[
           "123",
           "124"
         ],
-        "status-code":1,
-        "status-message":"Batch operation success on the above cmhandle ids ",
-        "data":{
+        "statusCode":1,
+        "statusMessage":"Batch operation success on the above cmhandle ids ",
+        "responseContent":{
           "ietf-netconf-monitoring:netconf-state":{
             "schemas":{
               "schema":[
             }
           }
         }
-      },
-      {
-        "operationId":"101",
-        "ids":[
-          "456",
-          "457"
-        ],
-        "status-code":101,
-        "status-message":"cmHandle(s) do not exist",
-        "data":{
-          "error":{
-            "message":"cmHandle(s) do not exist"
-          }
-        }
       }
     ]
   }