Merge "Omitting the basic auth header in healthcheck test"
[cps.git] / cps-service / src / test / groovy / org / onap / cps / events / EventsPublisherSpec.groovy
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2024 Nordix Foundation
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.events
22
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import io.cloudevents.CloudEvent
28 import org.apache.kafka.clients.producer.ProducerRecord
29 import org.apache.kafka.clients.producer.RecordMetadata
30 import org.apache.kafka.common.TopicPartition
31 import org.apache.kafka.common.header.Headers
32 import org.apache.kafka.common.header.internals.RecordHeader
33 import org.apache.kafka.common.header.internals.RecordHeaders
34 import org.junit.jupiter.api.AfterEach
35 import org.junit.jupiter.api.BeforeEach
36 import org.slf4j.LoggerFactory
37 import org.springframework.kafka.core.KafkaTemplate
38 import org.springframework.kafka.support.SendResult
39 import org.springframework.util.SerializationUtils
40 import spock.lang.Specification
41
42 import java.util.concurrent.CompletableFuture
43
44 class EventsPublisherSpec extends Specification {
45
46     def legacyKafkaTemplateMock = Mock(KafkaTemplate)
47     def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
48     def logger = Spy(ListAppender<ILoggingEvent>)
49
50     void setup() {
51         def setupLogger = ((Logger) LoggerFactory.getLogger(EventsPublisher.class))
52         setupLogger.setLevel(Level.DEBUG)
53         setupLogger.addAppender(logger)
54         logger.start()
55     }
56
57     void cleanup() {
58         ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders()
59     }
60
61     def objectUnderTest = new EventsPublisher(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
62
63     def 'Publish Cloud Event'() {
64         given: 'a successfully published event'
65             def eventFuture = CompletableFuture.completedFuture(
66                 new SendResult(
67                     new ProducerRecord('some-topic', 'some-value'),
68                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
69                 )
70             )
71             def someCloudEvent = Mock(CloudEvent)
72             1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFuture
73         when: 'publishing the cloud event'
74             objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
75         then: 'the correct debug message is logged'
76             def lastLoggingEvent = logger.list[0]
77             assert lastLoggingEvent.level == Level.DEBUG
78             assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
79     }
80
81     def 'Publish Cloud Event with Exception'() {
82         given: 'a failed event'
83             def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
84             eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
85             def someCloudEvent = Mock(CloudEvent)
86             1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
87         when: 'publishing the cloud event'
88             objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
89         then: 'the correct error message is logged'
90             def lastLoggingEvent = logger.list[0]
91             assert lastLoggingEvent.level == Level.ERROR
92             assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
93     }
94
95     def 'Publish Legacy Event'() {
96         given: 'a successfully published event'
97             def eventFuture = CompletableFuture.completedFuture(
98                 new SendResult(
99                     new ProducerRecord('some-topic', 'some-value'),
100                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
101                 )
102             )
103             def someEvent = Mock(Object)
104             1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture
105         when: 'publishing the cloud event'
106             objectUnderTest.publishEvent('some-topic', 'some-event-key', someEvent)
107         then: 'the correct debug message is logged'
108             def lastLoggingEvent = logger.list[0]
109             assert lastLoggingEvent.level == Level.DEBUG
110             assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
111     }
112
113     def 'Publish Legacy Event with Headers as Map'() {
114         given: 'a successfully published event'
115             def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')]
116             def eventFuture = CompletableFuture.completedFuture(
117                 new SendResult(
118                     new ProducerRecord('some-topic', 'some-value'),
119                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
120                 )
121             )
122             def someEvent = Mock(Object.class)
123         when: 'publishing the legacy event'
124             objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
125         then: 'event is published'
126             1 * legacyKafkaTemplateMock.send(_) >> eventFuture
127         and: 'the correct debug message is logged'
128             def lastLoggingEvent = logger.list[0]
129             assert lastLoggingEvent.level == Level.DEBUG
130             assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
131     }
132
133     def 'Publish Legacy Event with Record Headers'() {
134         given: 'a successfully published event'
135             def sampleEventHeaders = new RecordHeaders([new RecordHeader('k1', SerializationUtils.serialize('v1'))])
136             def sampleProducerRecord = new ProducerRecord('some-topic', null, 'some-key', 'some-value', sampleEventHeaders)
137             def eventFuture = CompletableFuture.completedFuture(
138                 new SendResult(
139                     sampleProducerRecord,
140                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
141                 )
142             )
143             def someEvent = Mock(Object.class)
144         when: 'publishing the legacy event'
145             objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
146         then: 'event is published'
147             1 * legacyKafkaTemplateMock.send(_) >> eventFuture
148         and: 'the correct debug message is logged'
149             def lastLoggingEvent = logger.list[0]
150             assert lastLoggingEvent.level == Level.DEBUG
151             assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
152     }
153
154     def 'Handle Legacy Event Callback'() {
155         given: 'an event is successfully published'
156             def eventFuture = CompletableFuture.completedFuture(
157                 new SendResult(
158                     new ProducerRecord('some-topic', 'some-value'),
159                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
160                 )
161             )
162         when: 'handling legacy event callback'
163             objectUnderTest.handleLegacyEventCallback('some-topic', eventFuture)
164         then: 'the correct debug message is logged'
165             def lastLoggingEvent = logger.list[0]
166             assert lastLoggingEvent.level == Level.DEBUG
167             assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
168     }
169
170     def 'Handle Legacy Event Callback with Exception'() {
171         given: 'a failure to publish an event'
172             def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
173             eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
174         when: 'handling legacy event callback'
175             objectUnderTest.handleLegacyEventCallback('some-topic', eventFutureWithFailure)
176         then: 'the correct error message is logged'
177             def lastLoggingEvent = logger.list[0]
178             assert lastLoggingEvent.level == Level.ERROR
179             assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
180     }
181
182     def 'Convert to kafka headers'() {
183         given: 'Few key value pairs'
184             def someKeyValue = ['key1': 'value1', 'key2': 'value2']
185         when: 'we convert to headers'
186             def headers = objectUnderTest.convertToKafkaHeaders(someKeyValue)
187         then: 'it is correctly converted'
188             assert headers instanceof Headers
189         and: 'also has correct values'
190             assert headers[0].key() == 'key1'
191             assert headers[1].key() == 'key2'
192     }
193
194 }