From f4c3f0fcebec726ea74b44f9bca3b68e66176671 Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Thu, 8 Jun 2023 15:25:33 +0100 Subject: [PATCH] =?utf8?q?Patch=20#=201:=20Data=20operation=20response=20e?= =?utf8?q?vent=20(NCMP=20=E2=86=92=20Client=20App)=20to=20comply=20with=20?= =?utf8?q?CloudEvents?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit - Removed header definitions (since we now use CloudEvents) - Used 'dataOperation' instead of batch where appropriate. - Modified test json Issue-ID: CPS-1724 Signed-off-by: sourabh_sourabh Change-Id: Ic0f65297b944adf9cf5f3c2cbec679a031a675ec Signed-off-by: sourabh_sourabh --- .../schemas/async/batch-event-headers-1.0.0.json | 55 ---------------------- ...json => data-operation-event-schema-1.0.0.json} | 29 ++++++------ ...java => DataOperationRecordFilterStrategy.java} | 10 ++-- ...va => NcmpAsyncDataOperationEventConsumer.java} | 31 ++++++------ ...NcmpAsyncDataOperationEventConsumerSpec.groovy} | 38 +++++++-------- ...batchDataEvent.json => dataOperationEvent.json} | 24 ++-------- 6 files changed, 58 insertions(+), 129 deletions(-) delete mode 100644 cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json rename cps-ncmp-events/src/main/resources/schemas/async/{batch-event-schema-1.0.0.json => data-operation-event-schema-1.0.0.json} (73%) rename cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/{BatchRecordFilterStrategy.java => DataOperationRecordFilterStrategy.java} (82%) rename cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/{NcmpAsyncBatchEventConsumer.java => NcmpAsyncDataOperationEventConsumer.java} (60%) rename cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/{NcmpAsyncBatchEventConsumerSpec.groovy => NcmpAsyncDataOperationEventConsumerSpec.groovy} (72%) rename cps-ncmp-service/src/test/resources/{batchDataEvent.json => dataOperationEvent.json} (57%) diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json deleted file mode 100644 index bbcadcd0f..000000000 --- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-headers:1.0.0", - "$ref": "#/definitions/BatchEventHeaders", - "definitions": { - "BatchEventHeaders": { - "description": "The header information of the Batch event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchEventHeadersV1", - "properties": { - "eventId": { - "description": "The unique id for identifying the event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id received by NCMP as an acknowledgement.", - "type": "string" - }, - "eventTime": { - "description": "The time of the event. It should be in RFC format ('yyyy-MM-dd'T'HH:mm:ss.SSSZ').", - "type": "string" - }, - "eventTarget": { - "description": "The destination topic to forward the consumed event.", - "type": "string" - }, - "eventSource": { - "description": "The source of the event.", - "type": "string" - }, - "eventType": { - "description": "The type of the Batch event.", - "type": "string" - }, - "eventSchema": { - "description": "The schema of the Batch event payload.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The schema version of the Batch event payload.", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventTarget", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json similarity index 73% rename from cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json rename to cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json index da836ff16..308e3068d 100644 --- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json +++ b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json @@ -1,19 +1,18 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-schema:1.0.0", - "$ref": "#/definitions/BatchDataResponseEvent", + "$id": "urn:cps:org.onap.cps.ncmp.events.async:data-operation-event-schema:1.0.0", + "$ref": "#/definitions/DataOperationEvent", "definitions": { - "BatchDataResponseEvent": { - "description": "The payload of batch event.", + "DataOperationEvent": { + "description": "The payload of data operation event.", "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchDataResponseEventV1", + "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent", "properties": { - "event": { + "data": { "description": "The payload content of the requested data.", "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchDataEvent", "properties": { - "batch-responses": { + "responses": { "description": "An array of batch responses which contains both success and failure", "type": "array", "items": { @@ -27,15 +26,15 @@ "description": "Id's of the cmhandles", "type": "array" }, - "status-code": { + "statusCode": { "description": "which says success or failure (0-99) are for success and (100-199) are for failure", "type": "string" }, - "status-message": { + "statusMessage": { "description": "Human readable message, Which says what the response has", "type": "string" }, - "data": { + "responseContent": { "description": "Contains the requested data response.", "type": "object", "existingJavaType": "java.lang.Object", @@ -45,21 +44,21 @@ "required": [ "operationId", "ids", - "status-code", - "status-message" + "statusCode", + "statusMessage" ], "additionalProperties": false } } }, "required": [ - "batch-responses" + "responses" ], "additionalProperties": false } }, "required": [ - "event" + "data" ], "additionalProperties": false } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java similarity index 82% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java index b343d70a7..9e2b66a2c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java @@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.header.Header; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; /** - * Batch Record filter strategy, which helps to filter the consumer records. + * Data operation record filter strategy, which helps to filter the consumer records. * */ @Configuration -public class BatchRecordFilterStrategy { +public class DataOperationRecordFilterStrategy { /** * Filtering the consumer records based on the eventType header, It @@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy { * @return boolean value. */ @Bean - public RecordFilterStrategy filterBatchDataResponseEvent() { + public RecordFilterStrategy includeDataOperationEventsOnly() { return consumedRecord -> { final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); if (eventTypeHeader == null) { @@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy { } final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + && eventTypeHeaderValue.contains("DataOperationEvent")); }; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java similarity index 60% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java index 2a332d003..995a4d5a6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** - * Listener for cps-ncmp async batch events. + * Listener for cps-ncmp async data operation events. */ @Component @Slf4j @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpAsyncBatchEventConsumer { +public class NcmpAsyncDataOperationEventConsumer { - private final EventsPublisher eventsPublisher; + private final EventsPublisher eventsPublisher; /** - * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic' + * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' * and publish the same to the client specified topic. * - * @param batchEventConsumerRecord consuming event as a ConsumerRecord. + * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. */ @KafkaListener( topics = "${app.ncmp.async-m2m.topic}", - filter = "filterBatchDataResponseEvent", - groupId = "ncmp-batch-event-group", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"}) - public void consumeAndPublish(final ConsumerRecord batchEventConsumerRecord) { - log.info("Consuming event payload {} ...", batchEventConsumerRecord.value()); + filter = "includeDataOperationEventsOnly", + groupId = "ncmp-data-operation-event-group", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) + public void consumeAndPublish(final ConsumerRecord + dataOperationEventConsumerRecord) { + log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); final String eventTarget = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); final String eventId = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(), - batchEventConsumerRecord.value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); + eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), + dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy similarity index 72% rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy index 02071cd8c..d9b9ce6db 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -28,7 +28,7 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1 +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean @@ -37,43 +37,41 @@ import org.springframework.boot.test.context.SpringBootTest import org.springframework.kafka.listener.adapter.RecordFilterStrategy import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers - import java.time.Duration -@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper, - ObjectMapper]) +@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper]) @Testcontainers @DirtiesContext -class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { +class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncBatchEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher) + NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) @Autowired JsonObjectMapper jsonObjectMapper @Autowired - RecordFilterStrategy recordFilterStrategy + RecordFilterStrategy recordFilterStrategy def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) def static clientTopic = 'client-topic' - def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1' + def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' def 'Consume and publish event to client specified topic'() { given: 'consumer subscribing to client topic' legacyEventKafkaConsumer.subscribe([clientTopic]) - and: 'consumer record for batch event' - def consumerRecordIn = createConsumerRecord(batchEventType) - when: 'the batch event is consumed and published to client specified topic' - asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn) + and: 'consumer record for data operation event' + def consumerRecordIn = createConsumerRecord(dataOperationType) + when: 'the data operation event is consumed and published to client specified topic' + asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) and: 'the client specified topic is polled' def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId + def operationIdIn = consumerRecordIn.value.data.responses[0].operationId + def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId assert operationIdIn == operationIdOut } @@ -85,14 +83,14 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { then: 'the event is #description' assert result == expectedResult where: 'filter the event based on the eventType #eventType' - description | eventType || expectedResult - 'not filtered(the consumer will see the event)' | batchEventType || false - 'filtered(the consumer will not see the event)' | 'wrongType' || true + description | eventType || expectedResult + 'not filtered(the consumer will see the event)' | dataOperationType || false + 'filtered(the consumer will not see the event)' | 'wrongType' || true } def createConsumerRecord(eventTypeAsString) { - def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class) + def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) def eventTarget = SerializationUtils.serialize(clientTopic) def eventType = SerializationUtils.serialize(eventTypeAsString) def eventId = SerializationUtils.serialize('12345') diff --git a/cps-ncmp-service/src/test/resources/batchDataEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json similarity index 57% rename from cps-ncmp-service/src/test/resources/batchDataEvent.json rename to cps-ncmp-service/src/test/resources/dataOperationEvent.json index 49eb273f5..42268c0ef 100644 --- a/cps-ncmp-service/src/test/resources/batchDataEvent.json +++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json @@ -1,15 +1,15 @@ { - "event":{ - "batch-responses":[ + "data":{ + "responses":[ { "operationId":"1", "ids":[ "123", "124" ], - "status-code":1, - "status-message":"Batch operation success on the above cmhandle ids ", - "data":{ + "statusCode":1, + "statusMessage":"Batch operation success on the above cmhandle ids ", + "responseContent":{ "ietf-netconf-monitoring:netconf-state":{ "schemas":{ "schema":[ @@ -26,20 +26,6 @@ } } } - }, - { - "operationId":"101", - "ids":[ - "456", - "457" - ], - "status-code":101, - "status-message":"cmHandle(s) do not exist", - "data":{ - "error":{ - "message":"cmHandle(s) do not exist" - } - } } ] } -- 2.16.6