DMI Data AVC to cloud events
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumer.java
index f37497a..b5ca176 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
 import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeader;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
-import org.springframework.util.SerializationUtils;
 
 /**
  * Listener for AVC events.
@@ -47,34 +45,19 @@ public class AvcEventConsumer {
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
-    private final EventsPublisher<AvcEvent> eventsPublisher;
-    private final AvcEventMapper avcEventMapper;
-
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
      * Incoming AvcEvent in the form of Consumer Record.
      *
      * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(topics = "${app.dmi.cm-events.topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
-    public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+    public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
         log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
-        final String mutatedEventId = UUID.randomUUID().toString();
-        mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
-        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
-        eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
-                outgoingAvcEvent);
-    }
-
-    private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
-        final String eventId = "eventId";
-        final String existingEventId =
-                (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
-        eventHeaders.remove(eventId);
-        log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
-                mutatedEventId);
-        eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
-
+        final String newEventId = UUID.randomUUID().toString();
+        final CloudEvent outgoingAvcEvent =
+                CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+        eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
     }
 }