- private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) {
- return new ListenableFutureCallback<>() {
- @Override
- public void onFailure(final Throwable throwable) {
- log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage());
- }
-
- @Override
- public void onSuccess(final SendResult<String, ?> sendResult) {
- log.debug("Successfully published event to topic : {} , Event : {}",
- sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value());
+ private void handleLegacyEventCallback(final String topicName,
+ final CompletableFuture<SendResult<String, T>> eventFuture) {
+ eventFuture.whenComplete((result, e) -> {
+ if (e != null) {
+ log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
+ } else {
+ log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+ result.getProducerRecord().value());