import lombok.extern.slf4j.Slf4j;
import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.KafkaException;
import org.springframework.stereotype.Service;
/**
*/
public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) {
if (notificationsEnabled) {
- lcmEventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent);
+ try {
+ lcmEventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent);
+ } catch (final KafkaException e) {
+ log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
+ }
} else {
log.debug("Notifications disabled.");
}
package org.onap.cps.ncmp.api.impl.event.lcm
import org.onap.ncmp.cmhandle.event.lcm.LcmEvent
+import org.springframework.kafka.KafkaException
import spock.lang.Specification
class LcmEventsServiceSpec extends Specification {
'disabled' | false || 0
}
+ def 'Unable to send message'(){
+ given: 'a cm handle id and Lcm Event and notification enabled'
+ def cmHandleId = 'test-cm-handle-id'
+ def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId)
+ objectUnderTest.notificationsEnabled = true
+ when: 'publisher set to throw an exception'
+ mockLcmEventsPublisher.publishEvent(*_) >> { throw new KafkaException('publishing failed')}
+ and: 'an event is publised'
+ objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent)
+ then: 'the exception is just logged and not bubbled up'
+ noExceptionThrown()
+ }
+
}