DMI Data AVC to use kafka headers
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumer.java
index 83ad5e5..3bf02f0 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
+import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
+import org.springframework.util.SerializationUtils;
 
 /**
  * Listener for AVC events.
@@ -47,16 +52,28 @@ public class AvcEventConsumer {
 
 
     /**
-     * Consume the specified event.
+     * Incoming AvcEvent in the form of Consumer Record.
      *
-     * @param avcEvent the event to be consumed and produced.
+     * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(
-            topics = "${app.dmi.cm-events.topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"})
-    public void consumeAndForward(final AvcEvent avcEvent) {
-        log.debug("Consuming AVC event {} ...", avcEvent);
-        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent);
-        eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent);
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}",
+            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
+    public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+        log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+        final String mutatedEventId = UUID.randomUUID().toString();
+        mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
+        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
+        eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
+                outgoingAvcEvent);
+    }
+
+    private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
+        final String existingEventId =
+                (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value());
+        eventHeaders.remove("eventId");
+        log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
+                mutatedEventId);
+        eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId)));
+
     }
 }