X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2FEventsPublisher.java;h=58d0d2d48f2d050e41972086805151f785e7f738;hb=a01f8861a84931f4bdf2d69fa05a793afabc22e0;hp=ec344bbaeea467426122f90c872910ed8449e65e;hpb=36d6e83473846cd4e0ad5e42ad6a8d0e3e3b3425;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index ec344bbae..58d0d2d48 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -20,13 +20,18 @@ package org.onap.cps.ncmp.api.impl.events; +import io.cloudevents.CloudEvent; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.SerializationUtils; /** * EventsPublisher to publish events. @@ -37,30 +42,102 @@ import org.springframework.util.concurrent.ListenableFutureCallback; @RequiredArgsConstructor public class EventsPublisher { - private final KafkaTemplate eventKafkaTemplate; + /** + * KafaTemplate for legacy (non-cloud) events. + * + * @deprecated Cloud events should be used. Will address soon as part of https://jira.onap.org/browse/CPS-1717 + */ + @Deprecated(forRemoval = true) + private final KafkaTemplate legacyKafkaEventTemplate; + + private final KafkaTemplate cloudEventKafkaTemplate; /** - * LCM Event publisher. + * Generic CloudEvent publisher. * * @param topicName valid topic name * @param eventKey message key - * @param event message payload + * @param event message payload */ - public void publishEvent(final String topicName, final String eventKey, final T event) { - final ListenableFuture> eventFuture = - eventKafkaTemplate.send(topicName, eventKey, event); + public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + final CompletableFuture> eventFuture = + cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); - eventFuture.addCallback(new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable throwable) { - log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); } + }); + } - @Override - public void onSuccess(final SendResult sendResult) { + /** + * Generic Event publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + * @deprecated Cloud events should be used. Will address soon as part of https://jira.onap.org/browse/CPS-1717 + */ + @Deprecated(forRemoval = true) + public void publishEvent(final String topicName, final String eventKey, final T event) { + final CompletableFuture> eventFuture = + legacyKafkaEventTemplate.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> { + if (e == null) { log.debug("Successfully published event to topic : {} , Event : {}", - sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } else { + log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage()); } }); } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { + + final ProducerRecord producerRecord = + new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); + final CompletableFuture> eventFuture = + legacyKafkaEventTemplate.send(producerRecord); + eventFuture.whenComplete((result, ex) -> { + if (ex != null) { + log.error("Unable to publish event to topic : {} due to {}", topicName, ex.getMessage()); + } else { + log.debug("Successfully published event to topic : {} , Event : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } + }); + } + + /** + * Generic Event Publisher with headers. + * + * @param topicName valid topic name + * @param eventKey message key + * @param eventHeaders map of event headers + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final Map eventHeaders, + final T event) { + + publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); + } + + + private Headers convertToKafkaHeaders(final Map eventMessageHeaders) { + final Headers eventHeaders = new RecordHeaders(); + eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value))); + return eventHeaders; + } + }