Async: NCMP Rest impl. including Request ID generation
[cps.git] / cps-service / src / test / groovy / org / onap / cps / notification / KafkaPublisherSpecBase.groovy
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 Bell Canada. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  *
16  *  SPDX-License-Identifier: Apache-2.0
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.onap.cps.notification
21
22 import org.springframework.beans.factory.annotation.Autowired
23 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
24 import org.springframework.boot.test.context.SpringBootTest
25 import org.springframework.kafka.config.TopicBuilder
26 import org.springframework.kafka.core.ConsumerFactory
27 import org.springframework.kafka.core.KafkaAdmin
28 import org.springframework.kafka.core.KafkaTemplate
29 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
30 import org.springframework.kafka.listener.ContainerProperties
31 import org.springframework.kafka.listener.MessageListener
32 import org.springframework.kafka.test.utils.ContainerTestUtils
33 import org.springframework.test.context.ContextConfiguration
34 import org.springframework.test.context.DynamicPropertyRegistry
35 import org.springframework.test.context.DynamicPropertySource
36 import spock.lang.Shared
37 import spock.lang.Specification
38
39 @ContextConfiguration(classes = [KafkaAutoConfiguration, KafkaProducerListener, NotificationErrorHandler])
40 @SpringBootTest
41 class KafkaPublisherSpecBase extends Specification {
42
43     @Autowired
44     KafkaTemplate kafkaTemplate
45
46     @Autowired
47     KafkaAdmin kafkaAdmin
48
49     @Autowired
50     ConsumerFactory consumerFactory
51
52     @Shared volatile topicCreated = false
53     @Shared consumedMessages = new ArrayList<>()
54
55     def cpsEventTopic = 'cps-events'
56
57     @DynamicPropertySource
58     static void registerKafkaProperties(DynamicPropertyRegistry registry) {
59         registry.add("spring.kafka.bootstrap-servers", KafkaTestContainerConfig::getBootstrapServers)
60     }
61
62     def setup() {
63         // Kafka listener and topic should be created only once for a test-suite.
64         // We are also dependent on sprint context to achieve it, and can not execute it in setupSpec
65         if (!topicCreated) {
66             kafkaAdmin.createOrModifyTopics(TopicBuilder.name(cpsEventTopic).partitions(1).replicas(1).build())
67             startListeningToTopic()
68             topicCreated = true
69         }
70         /* kafka message listener stores the messages to consumedMessages.
71             It is important to clear the list before each test case so that test cases can fetch the message from index '0'.
72          */
73         consumedMessages.clear()
74     }
75
76     def startListeningToTopic() {
77         ContainerProperties containerProperties = new ContainerProperties(cpsEventTopic)
78         containerProperties.setMessageListener([
79                 onMessage: {
80                     record ->
81                         consumedMessages.add(record.value())
82                 }] as MessageListener)
83
84         ConcurrentMessageListenerContainer container =
85                 new ConcurrentMessageListenerContainer<>(
86                         consumerFactory,
87                         containerProperties)
88
89         container.start()
90         ContainerTestUtils.waitForAssignment(container, 1)
91     }
92
93 }