Merge "Improve performance of updateDataLeaves"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / async / NcmpAsyncDataOperationEventConsumer.java
index 995a4d5..4a0ec5c 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.async;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -39,7 +39,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class NcmpAsyncDataOperationEventConsumer {
 
-    private final EventsPublisher<DataOperationEvent> eventsPublisher;
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
      * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
@@ -52,14 +52,12 @@ public class NcmpAsyncDataOperationEventConsumer {
             filter = "includeDataOperationEventsOnly",
             groupId = "ncmp-data-operation-event-group",
             properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
-    public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
-                                              dataOperationEventConsumerRecord) {
+    public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
         log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
-        final String eventTarget = SerializationUtils
-                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
-        final String eventId = SerializationUtils
-                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
-        eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
-                dataOperationEventConsumerRecord.value());
+        final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
+                dataOperationEventConsumerRecord.headers(), "ce_destination");
+        final String eventId = KafkaHeaders.getParsedKafkaHeader(
+                dataOperationEventConsumerRecord.headers(), "ce_id");
+        eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
     }
 }