X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2FEventsPublisher.java;h=355e5cdf799e0f4fbe4570dcf750e17d5e6718ee;hb=c29a5fe32db7efb0fdefa3a3d020110aab9731b4;hp=4c9064852169cb4229e4be7766e91fe737f362ed;hpb=d43e45982198017702349a1ff3aba11b91d3215d;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 4c9064852..355e5cdf7 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events; import io.cloudevents.CloudEvent; import java.util.Map; +import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; @@ -31,8 +32,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; /** * EventsPublisher to publish events. @@ -61,9 +60,17 @@ public class EventsPublisher { * @param event message payload */ public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { - final ListenableFuture> eventFuture - = cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture> eventFuture = + cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { + log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(), + result.getProducerRecord().value()); + + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + } + }); } /** @@ -76,9 +83,9 @@ public class EventsPublisher { */ @Deprecated(forRemoval = true) public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture - = legacyKafkaEventTemplate.send(topicName, eventKey, event); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture> eventFuture = + legacyKafkaEventTemplate.send(topicName, eventKey, event); + handleLegacyEventCallback(topicName, eventFuture); } /** @@ -93,8 +100,8 @@ public class EventsPublisher { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final ListenableFuture> eventFuture = legacyKafkaEventTemplate.send(producerRecord); - eventFuture.addCallback(handleCallback(topicName)); + final CompletableFuture> eventFuture = legacyKafkaEventTemplate.send(producerRecord); + handleLegacyEventCallback(topicName, eventFuture); } /** @@ -111,19 +118,16 @@ public class EventsPublisher { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback> handleCallback(final String topicName) { - return new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable throwable) { - log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); - } - - @Override - public void onSuccess(final SendResult sendResult) { - log.debug("Successfully published event to topic : {} , Event : {}", - sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); + private void handleLegacyEventCallback(final String topicName, + final CompletableFuture> eventFuture) { + eventFuture.whenComplete((result, e) -> { + if (e != null) { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); + } else { + log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(), + result.getProducerRecord().value()); } - }; + }); } private Headers convertToKafkaHeaders(final Map eventMessageHeaders) {