6b0f7d9206557fe4a9a51ab6e017c0e565cb4775
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Samsung. All rights reserved.
4  *  Modifications Copyright (C) 2019,2023 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.plugins.event.carrier.kafka;
24
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertTrue;
29
30 import java.util.Properties;
31 import org.junit.Test;
32
33 public class KafkaCarrierTechnologyParametersTest {
34     @Test
35     public void testKafkaCarrierTechnologyParameters() {
36         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
37         assertNotNull(kafkaCarrierTechnologyParameters);
38
39         assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
40     }
41
42     @Test
43     public void testGetKafkaProducerProperties() {
44         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
45
46         Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
47         assertNotNull(kafkaProducerProperties);
48         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
49         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
50         assertEquals(null, kafkaProducerProperties.get("group.id"));
51         assertEquals(null, kafkaProducerProperties.get("Property0"));
52         assertEquals(null, kafkaProducerProperties.get("Property1"));
53         assertEquals(null, kafkaProducerProperties.get("Property2"));
54
55         // @formatter:off
56         String[][] kafkaProperties = {
57             {
58                 "Property0", "Value0"
59             },
60             {
61                 "Property1", "Value1"
62             }
63         };
64         // @formatter:on
65
66         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
67         kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
68         assertNotNull(kafkaProducerProperties);
69         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
70         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
71         assertEquals(null, kafkaProducerProperties.get("group.id"));
72         assertEquals("Value0", kafkaProducerProperties.get("Property0"));
73         assertEquals("Value1", kafkaProducerProperties.get("Property1"));
74         assertEquals(null, kafkaProducerProperties.get("Property2"));
75     }
76
77     @Test
78     public void testGetKafkaConsumerProperties() {
79         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
80
81         Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
82         assertNotNull(kafkaConsumerProperties);
83         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
84         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
85         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
86         assertEquals(null, kafkaConsumerProperties.get("Property0"));
87         assertEquals(null, kafkaConsumerProperties.get("Property1"));
88         assertEquals(null, kafkaConsumerProperties.get("Property2"));
89
90         // @formatter:off
91         String[][] kafkaProperties = {
92             {
93                 "Property0", "Value0"
94             },
95             {
96                 "Property1", "Value1"
97             }
98         };
99         // @formatter:on
100
101         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
102         kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
103         assertNotNull(kafkaConsumerProperties);
104         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
105         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
106         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
107         assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
108         assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
109         assertEquals(null, kafkaConsumerProperties.get("Property2"));
110     }
111
112     @Test
113     public void testValidate() {
114         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
115         assertNotNull(kafkaCarrierTechnologyParameters);
116
117         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
118
119         String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
120         kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
121         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
122         kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
123         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
124
125         origStringValue = kafkaCarrierTechnologyParameters.getAcks();
126         kafkaCarrierTechnologyParameters.setAcks(" ");
127         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
128         kafkaCarrierTechnologyParameters.setAcks(origStringValue);
129         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
130
131         origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
132         kafkaCarrierTechnologyParameters.setGroupId(" ");
133         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
134         kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
135         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
136
137         origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
138         kafkaCarrierTechnologyParameters.setProducerTopic(" ");
139         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
140         kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
141         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
142
143         int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
144         kafkaCarrierTechnologyParameters.setRetries(-1);
145         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
146         kafkaCarrierTechnologyParameters.setRetries(origIntValue);
147         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
148
149         origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
150         kafkaCarrierTechnologyParameters.setBatchSize(-1);
151         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
152         kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
153         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
154
155         origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
156         kafkaCarrierTechnologyParameters.setLingerTime(-1);
157         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
158         kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
159         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
160
161         long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
162         kafkaCarrierTechnologyParameters.setBufferMemory(-1);
163         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
164         kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
165         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
166
167         origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
168         kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
169         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
170         kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
171         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
172
173         origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
174         kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
175         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
176         kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
177         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
178
179         origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
180         kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
181         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
182         kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
183         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
184
185         origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
186         kafkaCarrierTechnologyParameters.setKeySerializer(" ");
187         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
188         kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
189         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
190
191         origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
192         kafkaCarrierTechnologyParameters.setValueSerializer(" ");
193         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
194         kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
195         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
196
197         origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
198         kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
199         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
200         kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
201         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
202
203         origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
204         kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
205         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
206         kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
207         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
208
209         String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
210         kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
211         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
212         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
213         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
214
215         kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
216         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
217         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
218         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
219
220         String[] blankStringList = { null, "" };
221         kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
222         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
223         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
224         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
225
226         String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
227         kafkaCarrierTechnologyParameters.setKafkaProperties(null);
228         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
229         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
230         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
231
232         kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
233         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
234         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
235         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
236
237         // @formatter:offkafkaCarrierTechnologyParameters
238         String[][] kafkaProperties0 = {
239             {
240                 null, "Value0"
241             }
242         };
243         // @formatter:on
244
245         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
246         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
247         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
248         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
249
250         // @formatter:off
251         String[][] kafkaProperties1 = {
252             {
253                 "Property1", null
254             }
255         };
256         // @formatter:on
257
258         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
259         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
260         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
261         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
262
263         // @formatter:off
264         String[][] kafkaProperties2 = {
265             {
266                 "Property1", null
267             }
268         };
269         // @formatter:on
270
271         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
272         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
273
274         // @formatter:off
275         String[][] kafkaPropertiesWithEmptyValue = {
276             {
277                 "Property1", ""
278             }
279         };
280         // @formatter:on
281
282         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
283         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
284
285         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
286         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
287
288         // @formatter:off
289         String[][] kafkaProperties3 = {
290             {
291                 "Property1", "Value0", "Value1"
292             }
293         };
294         // @formatter:on
295
296         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
297         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
298         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
299         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
300     }
301
302     @Test
303     public void testExplicitImplicit() {
304         KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
305         assertNotNull(kafkaCtp);
306
307         assertTrue(kafkaCtp.validate().isValid());
308
309         // @formatter:off
310         assertEquals("localhost:9092",   kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
311         assertEquals("all",              kafkaCtp.getKafkaProducerProperties().get("acks"));
312         assertEquals("0",                kafkaCtp.getKafkaProducerProperties().get("retries"));
313         assertEquals("16384",            kafkaCtp.getKafkaProducerProperties().get("batch.size"));
314         assertEquals("1",                kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
315         assertEquals("33554432",         kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
316         assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
317         assertEquals("true",             kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
318         assertEquals("1000",             kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
319         assertEquals("30000",            kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
320         // @formatter:on
321
322         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
323                 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
324         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
325                 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
326         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
327                 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
328         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
329                 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
330
331         // @formatter:off
332         String[][] kafkaProperties0 = {
333             {
334                 "bootstrap.servers", "localhost:9092"
335             }
336         };
337         // @formatter:on
338
339         kafkaCtp.setBootstrapServers(null);
340         kafkaCtp.setKafkaProperties(kafkaProperties0);
341         assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
342
343         // @formatter:off
344         String[][] kafkaProperties1 = {
345             {
346                 "bootstrap.servers", "localhost:9999"
347             }
348         };
349         // @formatter:on
350
351         kafkaCtp = new KafkaCarrierTechnologyParameters();
352         kafkaCtp.setKafkaProperties(kafkaProperties1);
353         assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
354
355         // @formatter:off
356         String[][] kafkaProperties2 = {
357             {
358                 "bootstrap.servers", "localhost:8888"
359             }
360         };
361         // @formatter:on
362
363         kafkaCtp = new KafkaCarrierTechnologyParameters();
364         kafkaCtp.setBootstrapServers("localhost:9092");
365         kafkaCtp.setKafkaProperties(kafkaProperties2);
366         assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
367
368         // @formatter:off
369         String[][] kafkaProperties3 = {
370             {
371                 "bootstrap.servers", "localhost:5555"
372             }
373         };
374         // @formatter:on
375
376         kafkaCtp = new KafkaCarrierTechnologyParameters();
377         kafkaCtp.setBootstrapServers("localhost:7777");
378         kafkaCtp.setKafkaProperties(kafkaProperties3);
379         assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
380     }
381 }