2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.events
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import io.cloudevents.CloudEvent
28 import java.util.concurrent.CompletableFuture
29 import org.apache.kafka.clients.producer.ProducerRecord
30 import org.apache.kafka.clients.producer.RecordMetadata
31 import org.apache.kafka.common.TopicPartition
32 import org.junit.jupiter.api.AfterEach
33 import org.junit.jupiter.api.BeforeEach
34 import org.slf4j.LoggerFactory
35 import org.springframework.kafka.core.KafkaTemplate
36 import org.springframework.kafka.support.SendResult
37 import spock.lang.Specification
39 class EventsPublisherSpec extends Specification {
41 def legacyKafkaTemplateStub = Stub(KafkaTemplate)
42 def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
43 def logger = Spy(ListAppender<ILoggingEvent>)
47 def setupLogger = ((Logger) LoggerFactory.getLogger(EventsPublisher.class))
48 setupLogger.setLevel(Level.DEBUG)
49 setupLogger.addAppender(logger)
55 ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders()
58 def objectUnderTest = new EventsPublisher(legacyKafkaTemplateStub, mockCloudEventKafkaTemplate)
60 def 'Publish Cloud Event'() {
61 given: 'a successfully published event'
62 def eventFuture = CompletableFuture.completedFuture(
64 new ProducerRecord('some-topic', 'some-value'),
65 new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
68 def someCloudEvent = Mock(CloudEvent)
69 1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFuture
70 when: 'publishing the cloud event'
71 objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
72 then: 'the correct debug message is logged'
73 def lastLoggingEvent = logger.list[0]
74 assert lastLoggingEvent.level == Level.DEBUG
75 assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
78 def 'Publish Cloud Event with Exception'() {
79 given: 'a failed event'
80 def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
81 eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
82 def someCloudEvent = Mock(CloudEvent)
83 1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
84 when: 'publishing the cloud event'
85 objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
86 then: 'the correct error message is logged'
87 def lastLoggingEvent = logger.list[0]
88 assert lastLoggingEvent.level == Level.ERROR
89 assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
92 def 'Handle Legacy Event Callback'() {
93 given: 'an event is successfully published'
94 def eventFuture = CompletableFuture.completedFuture(
96 new ProducerRecord('some-topic', 'some-value'),
97 new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
100 when: 'handling legacy event callback'
101 objectUnderTest.handleLegacyEventCallback('some-topic', eventFuture)
102 then: 'the correct debug message is logged'
103 def lastLoggingEvent = logger.list[0]
104 assert lastLoggingEvent.level == Level.DEBUG
105 assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
108 def 'Handle Legacy Event Callback with Exception'() {
109 given: 'a failure to publish an event'
110 def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
111 eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
112 when: 'handling legacy event callback'
113 objectUnderTest.handleLegacyEventCallback('some-topic', eventFutureWithFailure)
114 then: 'the correct error message is logged'
115 def lastLoggingEvent = logger.list[0]
116 assert lastLoggingEvent.level == Level.ERROR
117 assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')