DMI Data AVC to use kafka headers 80/134480/14
authormpriyank <priyank.maheshwari@est.tech>
Thu, 4 May 2023 10:24:29 +0000 (11:24 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Wed, 10 May 2023 12:42:12 +0000 (13:42 +0100)
- POC done keeping AvcEvent schema in mind.
- Approach to have header schema per event schema.
- Moved the header information from AvcEvent to separate AvcEventHeader
  schema.
- Added Jsr303 annotation support for required field check

Issue-ID: CPS-1671
Change-Id: I2e4f969e8ca4f6282d1b9aa5fd52d16174a26084
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-dependencies/pom.xml
cps-ncmp-events/pom.xml
cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-headers-v1.json [moved from cps-ncmp-events/src/main/resources/schemas/avc-event-schema-v1.json with 74% similarity]
cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json

index 7050142..e06bbd7 100755 (executable)
                 <artifactId>guava</artifactId>
                 <version>31.1-jre</version>
             </dependency>
+            <dependency>
+                <groupId>javax.validation</groupId>
+                <artifactId>validation-api</artifactId>
+                <version>2.0.1.Final</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
index 7457eb6..52ca77e 100644 (file)
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
@@ -47,6 +51,7 @@
                     <targetPackage>org.onap.cps.ncmp.event.model</targetPackage>
                     <generateBuilders>true</generateBuilders>
                     <serializable>true</serializable>
+                    <includeJsr303Annotations>true</includeJsr303Annotations>
                 </configuration>
             </plugin>
         </plugins>
@@ -1,11 +1,12 @@
 {
   "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1",
-  "$ref": "#/definitions/AvcEvent",
+  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-headers-schema:v1",
+  "$ref": "#/definitions/AvcEventHeader",
   "definitions": {
-    "AvcEvent": {
-      "description": "The payload for AVC event.",
+    "AvcEventHeader": {
+      "description": "The header for AVC event.",
       "type": "object",
+      "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEventHeader",
       "properties": {
         "eventId": {
           "description": "The unique id identifying the event generated by DMI for this AVC event.",
         "eventSchemaVersion": {
           "description": "The event schema version for AVC events.",
           "type": "string"
-        },
-        "event": {
-        "$ref": "#/definitions/Event"
         }
       },
       "required": [
         "eventId",
         "eventCorrelationId",
-        "eventTime",
-        "eventSource",
         "eventType",
         "eventSchema",
         "eventSchemaVersion"
       ],
       "additionalProperties": false
-    },
-    "Event": {
-      "description": "The AVC event content.",
-      "type": "object",
-      "existingJavaType": "java.lang.Object",
-      "additionalProperties": false
     }
   }
 }
\ No newline at end of file
diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json
new file mode 100644 (file)
index 0000000..407551f
--- /dev/null
@@ -0,0 +1,24 @@
+{
+  "$schema": "https://json-schema.org/draft/2019-09/schema",
+  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1",
+  "$ref": "#/definitions/AvcEvent",
+  "definitions": {
+    "AvcEvent": {
+      "description": "The payload for AVC event.",
+      "type": "object",
+      "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEvent",
+      "properties": {
+        "event": {
+          "description": "The AVC event content.",
+          "type": "object",
+          "existingJavaType": "java.lang.Object",
+          "additionalProperties": false
+        }
+      },
+      "required": [
+        "event"
+      ],
+      "additionalProperties": false
+    }
+  }
+}
\ No newline at end of file
index ec344bb..4c84629 100644 (file)
@@ -22,6 +22,8 @@ package org.onap.cps.ncmp.api.impl.events;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
@@ -40,17 +42,36 @@ public class EventsPublisher<T> {
     private final KafkaTemplate<String, T> eventKafkaTemplate;
 
     /**
-     * LCM Event publisher.
+     * Generic Event publisher.
      *
      * @param topicName valid topic name
      * @param eventKey  message key
-     * @param event message payload
+     * @param event     message payload
      */
+    @Deprecated
     public void publishEvent(final String topicName, final String eventKey, final T event) {
-        final ListenableFuture<SendResult<String, T>> eventFuture =
-                eventKafkaTemplate.send(topicName, eventKey, event);
+        final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event);
+        eventFuture.addCallback(handleCallback(topicName));
+    }
+
+    /**
+     * Generic Event Publisher with headers.
+     *
+     * @param topicName    valid topic name
+     * @param eventKey     message key
+     * @param eventHeaders event headers
+     * @param event        message payload
+     */
+    public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
+
+        final ProducerRecord<String, T> producerRecord =
+                new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
+        final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(producerRecord);
+        eventFuture.addCallback(handleCallback(topicName));
+    }
 
-        eventFuture.addCallback(new ListenableFutureCallback<>() {
+    private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
+        return new ListenableFutureCallback<>() {
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage());
@@ -61,6 +82,7 @@ public class EventsPublisher<T> {
                 log.debug("Successfully published event to topic : {} , Event : {}",
                         sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value());
             }
-        });
+        };
     }
+
 }
index 83ad5e5..3bf02f0 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
+import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
+import org.springframework.util.SerializationUtils;
 
 /**
  * Listener for AVC events.
@@ -47,16 +52,28 @@ public class AvcEventConsumer {
 
 
     /**
-     * Consume the specified event.
+     * Incoming AvcEvent in the form of Consumer Record.
      *
-     * @param avcEvent the event to be consumed and produced.
+     * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(
-            topics = "${app.dmi.cm-events.topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"})
-    public void consumeAndForward(final AvcEvent avcEvent) {
-        log.debug("Consuming AVC event {} ...", avcEvent);
-        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent);
-        eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent);
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}",
+            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
+    public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+        log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+        final String mutatedEventId = UUID.randomUUID().toString();
+        mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
+        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
+        eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
+                outgoingAvcEvent);
+    }
+
+    private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
+        final String existingEventId =
+                (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value());
+        eventHeaders.remove("eventId");
+        log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
+                mutatedEventId);
+        eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId)));
+
     }
 }
index 113da0d..8246ed4 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
-import java.util.UUID;
 import org.mapstruct.Mapper;
-import org.mapstruct.Mapping;
-import org.mapstruct.Named;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
 
 
 /**
@@ -33,12 +30,6 @@ import org.onap.cps.ncmp.event.model.AvcEvent;
 @Mapper(componentModel = "spring")
 public interface AvcEventMapper {
 
-    @Mapping(source = "eventId", target = "eventId", qualifiedByName = "avcEventId")
     AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent);
 
-    @Named("avcEventId")
-    static String getAvcEventId(String eventId) {
-        return UUID.randomUUID().toString();
-    }
-
 }
index d57527a..5f54bbe 100644 (file)
@@ -1,6 +1,6 @@
 /*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2023 Nordix Foundation.
+ *  ============LICENSE_START=======================================================
+ *  Copyright (c) 2023 Nordix Foundation.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 package org.onap.cps.ncmp.api.impl.events.avc
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.header.internals.RecordHeader
 import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.event.model.AvcEvent
+import org.onap.cps.ncmp.events.avc.v1.AvcEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
+import org.springframework.util.SerializationUtils
 import org.testcontainers.spock.Testcontainers
 
 import java.time.Duration
@@ -63,23 +66,33 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
         and: 'an event is sent'
             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
+        and: 'event has header information'
+            def consumerRecord = new ConsumerRecord<String,AvcEvent>(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent)
+            consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid')))
+            consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1')))
         when: 'the event is consumed'
-            acvEventConsumer.consumeAndForward(testEventSent)
+            acvEventConsumer.consumeAndForward(consumerRecord)
         and: 'the topic is polled'
             def records = kafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record can be converted to AVC event'
             def record = records.iterator().next()
-            def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent)
-        and: 'consumed forwarded NCMP event id differs from DMI event id'
-            assert testEventSent.eventId != convertedAvcEvent.getEventId()
-        and: 'correlation id matches'
-            assert testEventSent.eventCorrelationId == convertedAvcEvent.getEventCorrelationId()
-        and: 'timestamps match'
-            assert testEventSent.eventTime == convertedAvcEvent.getEventTime()
-        and: 'target matches'
-            assert testEventSent.eventSource == convertedAvcEvent.getEventSource()
+            def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class)
+        and: 'we have correct headers forwarded where correlation id matches'
+            record.headers().forEach(header -> {
+                if (header.key().equals('eventCorrelationId')) {
+                    assert SerializationUtils.deserialize(header.value()) == 'cmhandle1'
+                }
+            })
+        and: 'event id differs(as per requirement) between consumed and forwarded'
+            record.headers().forEach(header -> {
+                if (header.key().equals('eventId')) {
+                    assert SerializationUtils.deserialize(header.value()) != 'sample-eventid'
+                }
+            })
+        and: 'the event payload still matches'
+            assert testEventSent == convertedAvcEvent
     }
 
 }
\ No newline at end of file
index bda2b4e..de8a523 100644 (file)
@@ -1,11 +1,4 @@
 {
-  "eventId": "4cb32729-85e3-44d1-aa6e-c923b9b059a5",
-  "eventCorrelationId": "68f15800-8ed4-4bae-9e53-27a9e03e1911",
-  "eventTime": "2022-12-12T14:29:23.876+0000",
-  "eventSource": "NCMP",
-  "eventType": "org.onap.cps.ncmp.event.model.AvcEvent",
-  "eventSchema": "urn:cps:org.onap.cps.ncmp.event.model.AvcEvent",
-  "eventSchemaVersion": "v1",
   "event": {
     "payload": "Hello world!"
   }