62210361cbaf26feb86ed50b6e1d2bb10ef669fd
[policy/clamp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2022, 2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.common.message.bus.event.kafka;
20
21 import static org.junit.jupiter.api.Assertions.assertEquals;
22 import static org.junit.jupiter.api.Assertions.assertFalse;
23 import static org.junit.jupiter.api.Assertions.assertNotNull;
24 import static org.junit.jupiter.api.Assertions.assertTrue;
25 import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SINK_TOPICS;
26 import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
27
28 import java.util.Deque;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Properties;
32 import org.junit.jupiter.api.AfterEach;
33 import org.junit.jupiter.api.BeforeEach;
34 import org.junit.jupiter.api.Test;
35 import org.onap.policy.common.message.bus.event.base.TopicPropertyBuilder;
36 import org.onap.policy.common.parameters.topic.BusTopicParams;
37
38 class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> {
39
40     private SinkFactory factory;
41     public static final String KAFKA_SERVER = "localhost:9092";
42
43     /**
44      * Creates the object to be tested.
45      */
46     @BeforeEach
47     @Override
48     public void setUp() {
49         super.setUp();
50
51         factory = new SinkFactory();
52     }
53
54     @AfterEach
55     public void tearDown() {
56         factory.destroy();
57     }
58
59     @Test
60     @Override
61     public void testBuildBusTopicParams() {
62         super.testBuildBusTopicParams();
63         super.testBuildBusTopicParams_Ex();
64     }
65
66     @Test
67     @Override
68     public void testBuildListOfStringString() {
69         super.testBuildListOfStringString();
70
71         // check parameters that were used
72         BusTopicParams params = getLastParams();
73         assertFalse(params.isAllowSelfSignedCerts());
74     }
75
76     @Test
77     @Override
78     public void testBuildProperties() {
79         List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
80         assertEquals(1, topics.size());
81         assertEquals(MY_TOPIC, topics.get(0).getTopic());
82         assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
83
84         BusTopicParams params = getLastParams();
85         assertTrue(params.isManaged());
86         assertFalse(params.isUseHttps());
87         assertEquals(List.of(KAFKA_SERVER), params.getServers());
88         assertEquals(MY_TOPIC, params.getTopic());
89         assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
90         assertEquals(MY_PARTITION, params.getPartitionId());
91         assertNotNull(params.getAdditionalProps());
92
93         List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
94             .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
95         assertEquals(1, topics2.size());
96         assertEquals(TOPIC3, topics2.get(0).getTopic());
97         assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic());
98
99         initFactory();
100
101         assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
102     }
103
104     @Test
105     void testBuildFromProperties() {
106         Properties props = makePropBuilder().makeTopic(MY_TOPIC).build();
107         var listTopic = factory.build(props);
108         assertNotNull(listTopic);
109     }
110
111     @Test
112     @Override
113     public void testDestroyString_testGet_testInventory() {
114         super.testDestroyString_testGet_testInventory();
115         super.testDestroyString_Ex();
116     }
117
118     @Test
119     @Override
120     public void testDestroy() {
121         super.testDestroy();
122     }
123
124     @Test
125     void testGet() {
126         super.testGet_Ex();
127     }
128
129     @Test
130     void testToString() {
131         assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory ["));
132     }
133
134     @Override
135     protected void initFactory() {
136         if (factory != null) {
137             factory.destroy();
138         }
139
140         factory = new SinkFactory();
141     }
142
143     @Override
144     protected List<KafkaTopicSink> buildTopics(Properties properties) {
145         return factory.build(properties);
146     }
147
148     @Override
149     protected KafkaTopicSink buildTopic(BusTopicParams params) {
150         return factory.build(params);
151     }
152
153     @Override
154     protected KafkaTopicSink buildTopic(List<String> servers, String topic) {
155         return factory.build(servers, topic);
156     }
157
158     @Override
159     protected void destroyFactory() {
160         factory.destroy();
161     }
162
163     @Override
164     protected void destroyTopic(String topic) {
165         factory.destroy(topic);
166     }
167
168     @Override
169     protected List<KafkaTopicSink> getInventory() {
170         return factory.inventory();
171     }
172
173     @Override
174     protected KafkaTopicSink getTopic(String topic) {
175         return factory.get(topic);
176     }
177
178     @Override
179     protected BusTopicParams getLastParams() {
180         return factory.params.getLast();
181     }
182
183     @Override
184     protected TopicPropertyBuilder makePropBuilder() {
185         return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS);
186     }
187
188     /**
189      * Factory that records the parameters of all the sinks it creates.
190      */
191     private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
192         private Deque<BusTopicParams> params = new LinkedList<>();
193
194         @Override
195         protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
196             params.add(busTopicParams);
197             return super.makeSink(busTopicParams);
198         }
199     }
200 }