X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2FEventsPublisher.java;h=49e455e5801e316eeed6d876988aa0050557e35f;hb=477bd46874d2c8266007358f9ca335b0442cea18;hp=4c9064852169cb4229e4be7766e91fe737f362ed;hpb=0af82141cb75a04a16248e17d4e915a580e82300;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 4c9064852..49e455e58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events; import io.cloudevents.CloudEvent; import java.util.Map; +import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; @@ -31,8 +32,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; /** * EventsPublisher to publish events. @@ -61,9 +60,17 @@ public class EventsPublisher { * @param event message payload */ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { - final ListenableFuture> eventFuture - = cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture> eventFuture = + cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + } + }); } /** @@ -76,9 +83,16 @@ public class EventsPublisher { */ @Deprecated(forRemoval = true) public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture - = legacyKafkaEventTemplate.send(topicName, eventKey, event); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture> eventFuture = + legacyKafkaEventTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + } + }); } /** @@ -93,8 +107,16 @@ public class EventsPublisher { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final ListenableFuture> eventFuture = legacyKafkaEventTemplate.send(producerRecord); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture> eventFuture = + legacyKafkaEventTemplate.send(producerRecord); + eventFuture.whenComplete((result, ex) -> { + if (ex != null) { + log.error("Unable to publish event to topic : {} due to {}", topicName, ex.getMessage()); + } else { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } + }); } /** @@ -111,21 +133,6 @@ public class EventsPublisher { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback> handleCallback(final String topicName) { - return new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable throwable) { - log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); - } - - @Override - public void onSuccess(final SendResult sendResult) { - log.debug("Successfully published event to topic : {} , Event : {}", - sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); - } - }; - } - private Headers convertToKafkaHeaders(final Map eventMessageHeaders) { final Headers eventHeaders = new RecordHeaders(); eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));