DMI Data AVC to cloud events 38/134938/2
authormpriyank <priyank.maheshwari@est.tech>
Fri, 16 Jun 2023 14:55:52 +0000 (15:55 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Mon, 19 Jun 2023 10:54:30 +0000 (11:54 +0100)
- DMI Data AVC to be consumed as CloudEvents now
- Removed the schema header as it is taken care by cloudevent headers
- Implemented naming and packaging comments on the schema
- Test cases refactoring

Issue-ID: CPS-1719
Change-Id: I9864f90446720fe903fb3c1502d86035d886751d
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json [deleted file]
cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json [moved from cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json with 92% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java [deleted file]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json

diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json
deleted file mode 100644 (file)
index ea1e617..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-{
-  "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-header-schema:v1",
-  "$ref": "#/definitions/AvcEventHeader",
-  "definitions": {
-    "AvcEventHeader": {
-      "description": "The header for AVC event.",
-      "type": "object",
-      "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEventHeader",
-      "properties": {
-        "eventId": {
-          "description": "The unique id identifying the event generated by DMI for this AVC event.",
-          "type": "string"
-        },
-        "eventCorrelationId": {
-          "description": "The request id passed by NCMP for this AVC event.",
-          "type": "string"
-        },
-        "eventTime": {
-          "description": "The time of the AVC event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.",
-          "type": "string"
-        },
-        "eventSource": {
-          "description": "The source of the AVC event.",
-          "type": "string"
-        },
-        "eventType": {
-          "description": "The type of the AVC event.",
-          "type": "string"
-        },
-        "eventSchema": {
-          "description": "The event schema for AVC events.",
-          "type": "string"
-        },
-        "eventSchemaVersion": {
-          "description": "The event schema version for AVC events.",
-          "type": "string"
-        }
-      },
-      "required": [
-        "eventId",
-        "eventCorrelationId",
-        "eventType",
-        "eventSchema",
-        "eventSchemaVersion"
-      ],
-      "additionalProperties": false
-    }
-  }
-}
\ No newline at end of file
@@ -1,10 +1,9 @@
 {
   "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1",
+  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:1.0.0",
   "$ref": "#/definitions/AvcEvent",
   "definitions": {
     "Edit": {
-      "javaType": "org.onap.cps.ncmp.events.avc.v1.Edit",
       "additionalProperties": false,
       "properties": {
         "edit-id": {
@@ -48,9 +47,9 @@
     "AvcEvent": {
       "description": "The payload for AVC event.",
       "type": "object",
-      "javaType": "org.onap.cps.ncmp.events.avc.v1.AvcEvent",
+      "javaType": "org.onap.cps.ncmp.events.avc1_0_0.AvcEvent",
       "properties": {
-        "event": {
+        "data": {
           "description": "The AVC event content compliant with RFC8641 format",
           "type": "object",
           "additionalProperties": false,
@@ -99,7 +98,7 @@
         }
       },
       "required": [
-        "event"
+        "data"
       ],
       "additionalProperties": false
     }
index 7b28b4c..e61e772 100644 (file)
@@ -50,6 +50,19 @@ public class EventsPublisher<T> {
 
     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
 
+    /**
+     * Generic CloudEvent publisher.
+     *
+     * @param topicName valid topic name
+     * @param eventKey  message key
+     * @param event     message payload
+     */
+    public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
+        final ListenableFuture<SendResult<String, CloudEvent>> eventFuture
+                = cloudEventKafkaTemplate.send(topicName, eventKey, event);
+        eventFuture.addCallback(handleCallback(topicName));
+    }
+
     /**
      * Generic Event publisher.
      *
@@ -95,7 +108,7 @@ public class EventsPublisher<T> {
         publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
     }
 
-    private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
+    private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) {
         return new ListenableFutureCallback<>() {
             @Override
             public void onFailure(final Throwable throwable) {
@@ -103,7 +116,7 @@ public class EventsPublisher<T> {
             }
 
             @Override
-            public void onSuccess(final SendResult<String, T> sendResult) {
+            public void onSuccess(final SendResult<String, ?> sendResult) {
                 log.debug("Successfully published event to topic : {} , Event : {}",
                         sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value());
             }
index f37497a..b5ca176 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
 import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeader;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
-import org.springframework.util.SerializationUtils;
 
 /**
  * Listener for AVC events.
@@ -47,34 +45,19 @@ public class AvcEventConsumer {
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
-    private final EventsPublisher<AvcEvent> eventsPublisher;
-    private final AvcEventMapper avcEventMapper;
-
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
      * Incoming AvcEvent in the form of Consumer Record.
      *
      * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(topics = "${app.dmi.cm-events.topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
-    public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+    public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
         log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
-        final String mutatedEventId = UUID.randomUUID().toString();
-        mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
-        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
-        eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
-                outgoingAvcEvent);
-    }
-
-    private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
-        final String eventId = "eventId";
-        final String existingEventId =
-                (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
-        eventHeaders.remove(eventId);
-        log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
-                mutatedEventId);
-        eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
-
+        final String newEventId = UUID.randomUUID().toString();
+        final CloudEvent outgoingAvcEvent =
+                CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+        eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
     }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
deleted file mode 100644 (file)
index 8246ed4..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.avc;
-
-import org.mapstruct.Mapper;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
-
-
-/**
- * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}.
- */
-@Mapper(componentModel = "spring")
-public interface AvcEventMapper {
-
-    AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent);
-
-}
index 3dffac7..4a9e3ee 100644 (file)
 package org.onap.cps.ncmp.api.impl.events.avc
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
-import org.springframework.util.SerializationUtils
 import org.testcontainers.spock.Testcontainers
 
 import java.time.Duration
@@ -46,52 +48,49 @@ import java.time.Duration
 class AvcEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
+    EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
-
-    @SpringBean
-    AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper)
+    AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
+    @Autowired
+    ObjectMapper objectMapper
+
+    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
 
     def 'Consume and forward valid message'() {
         given: 'consumer has a subscription on a topic'
             def cmEventsTopicName = 'cm-events'
             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
-            legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
+            cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
         and: 'an event is sent'
             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
+            def testCloudEventSent = CloudEventBuilder.v1()
+                .withData(objectMapper.writeValueAsBytes(testEventSent))
+                .withId('sample-eventid')
+                .withType('sample-test-type')
+                .withSource(URI.create('sample-test-source'))
+                .withExtension('correlationid', 'test-cmhandle1').build()
         and: 'event has header information'
-            def consumerRecord = new ConsumerRecord<String,AvcEvent>(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent)
-            consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid')))
-            consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1')))
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
         when: 'the event is consumed'
             acvEventConsumer.consumeAndForward(consumerRecord)
         and: 'the topic is polled'
-            def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
+            def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record can be converted to AVC event'
             def record = records.iterator().next()
-            def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class)
+            def cloudevent = record.value() as CloudEvent
+            def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
         and: 'we have correct headers forwarded where correlation id matches'
-            record.headers().forEach(header -> {
-                if (header.key().equals('eventCorrelationId')) {
-                    assert SerializationUtils.deserialize(header.value()) == 'cmhandle1'
-                }
-            })
+            assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
         and: 'event id differs(as per requirement) between consumed and forwarded'
-            record.headers().forEach(header -> {
-                if (header.key().equals('eventId')) {
-                    assert SerializationUtils.deserialize(header.value()) != 'sample-eventid'
-                }
-            })
+            assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
         and: 'the event payload still matches'
             assert testEventSent == convertedAvcEvent
     }
index 569343f..5b297c8 100644 (file)
@@ -1,5 +1,5 @@
 {
-  "event":{
+  "data":{
     "push-change-update":{
       "datastore-changes":{
         "ietf-yang-patch:yang-patch":{