X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2FEventsPublisher.java;h=4c846293047452589b911ccf60d94842ee408ef0;hb=e626c9661fd88a585b50dafab5f5542784690143;hp=ec344bbaeea467426122f90c872910ed8449e65e;hpb=36d6e83473846cd4e0ad5e42ad6a8d0e3e3b3425;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index ec344bbae..4c8462930 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -22,6 +22,8 @@ package org.onap.cps.ncmp.api.impl.events; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -40,17 +42,36 @@ public class EventsPublisher { private final KafkaTemplate eventKafkaTemplate; /** - * LCM Event publisher. + * Generic Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param event message payload + * @param event message payload */ + @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture = - eventKafkaTemplate.send(topicName, eventKey, event); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + + final ProducerRecord producerRecord = + new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + final ListenableFuture> eventFuture = eventKafkaTemplate.send(producerRecord); + eventFuture.addCallback(handleCallback(topicName)); + } - eventFuture.addCallback(new ListenableFutureCallback<>() { + private ListenableFutureCallback> handleCallback(final String topicName) { + return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); @@ -61,6 +82,7 @@ public class EventsPublisher { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } - }); + }; } + }