Merge "Fix test-deregistration script"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumer.java
index 3b5b5aa..b5ca176 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avc;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -36,18 +41,23 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class AvcEventConsumer {
 
-    private final AvcEventProducer avcEventProducer;
+
+    @Value("${app.ncmp.avc.cm-events-topic}")
+    private String cmEventsTopicName;
+
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
-     * Consume the specified event.
+     * Incoming AvcEvent in the form of Consumer Record.
      *
-     * @param avcEvent the event to be consumed and produced.
+     * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(
-            topics = "${app.dmi.cm-events.topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"})
-    public void consumeAndForward(final AvcEvent avcEvent) {
-        log.debug("Consuming AVC event {} ...", avcEvent);
-        avcEventProducer.sendMessage(avcEvent);
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+    public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
+        log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+        final String newEventId = UUID.randomUUID().toString();
+        final CloudEvent outgoingAvcEvent =
+                CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+        eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
     }
 }