Merge "Fix test-deregistration script"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumer.java
index 83ad5e5..b5ca176 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.event.model.AvcEvent;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
@@ -42,21 +45,19 @@ public class AvcEventConsumer {
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
-    private final EventsPublisher<AvcEvent> eventsPublisher;
-    private final AvcEventMapper avcEventMapper;
-
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
-     * Consume the specified event.
+     * Incoming AvcEvent in the form of Consumer Record.
      *
-     * @param avcEvent the event to be consumed and produced.
+     * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(
-            topics = "${app.dmi.cm-events.topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"})
-    public void consumeAndForward(final AvcEvent avcEvent) {
-        log.debug("Consuming AVC event {} ...", avcEvent);
-        final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent);
-        eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent);
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+    public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
+        log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+        final String newEventId = UUID.randomUUID().toString();
+        final CloudEvent outgoingAvcEvent =
+                CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+        eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
     }
 }