14f13bc97fa6c4860fbf2975ac914b9c2176a62e
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Samsung. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.plugins.event.carrier.kafka;
24
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertTrue;
29
30 import java.util.Properties;
31 import org.junit.Test;
32
33 public class KafkaCarrierTechnologyParametersTest {
34     @Test
35     public void testKafkaCarrierTechnologyParameters() {
36         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
37         assertNotNull(kafkaCarrierTechnologyParameters);
38
39         assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
40     }
41
42     @Test
43     public void testGetKafkaProducerProperties() {
44         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
45
46         Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
47         assertNotNull(kafkaProducerProperties);
48         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
49         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
50         assertEquals(null, kafkaProducerProperties.get("group.id"));
51         assertEquals(null, kafkaProducerProperties.get("Property0"));
52         assertEquals(null, kafkaProducerProperties.get("Property1"));
53         assertEquals(null, kafkaProducerProperties.get("Property2"));
54
55         // @formatter:off
56         String[][] kafkaProperties = {
57             {
58                 "Property0", "Value0"
59             },
60             {
61                 "Property1", "Value1"
62             }
63         };
64         // @formatter:on
65
66         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
67         kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
68         assertNotNull(kafkaProducerProperties);
69         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
70         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
71         assertEquals(null, kafkaProducerProperties.get("group.id"));
72         assertEquals("Value0", kafkaProducerProperties.get("Property0"));
73         assertEquals("Value1", kafkaProducerProperties.get("Property1"));
74         assertEquals(null, kafkaProducerProperties.get("Property2"));
75     }
76
77     @Test
78     public void testGetKafkaConsumerProperties() {
79         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
80
81         Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
82         assertNotNull(kafkaConsumerProperties);
83         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
84         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
85         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
86         assertEquals(null, kafkaConsumerProperties.get("Property0"));
87         assertEquals(null, kafkaConsumerProperties.get("Property1"));
88         assertEquals(null, kafkaConsumerProperties.get("Property2"));
89
90         // @formatter:off
91         String[][] kafkaProperties = {
92             {
93                 "Property0", "Value0"
94             },
95             {
96                 "Property1", "Value1"
97             }
98         };
99         // @formatter:on
100
101         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
102         kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
103         assertNotNull(kafkaConsumerProperties);
104         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
105         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
106         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
107         assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
108         assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
109         assertEquals(null, kafkaConsumerProperties.get("Property2"));
110     }
111
112     @Test
113     public void testValidate() {
114         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
115         assertNotNull(kafkaCarrierTechnologyParameters);
116
117         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
118
119         String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
120         kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
121         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
122         kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
123         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
124
125         origStringValue = kafkaCarrierTechnologyParameters.getAcks();
126         kafkaCarrierTechnologyParameters.setAcks(" ");
127         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
128         kafkaCarrierTechnologyParameters.setAcks(origStringValue);
129         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
130
131         origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
132         kafkaCarrierTechnologyParameters.setGroupId(" ");
133         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
134         kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
135         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
136
137         origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
138         kafkaCarrierTechnologyParameters.setProducerTopic(" ");
139         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
140         kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
141         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
142
143         origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
144         kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
145         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
146         kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
147         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
148
149         int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
150         kafkaCarrierTechnologyParameters.setRetries(-1);
151         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
152         kafkaCarrierTechnologyParameters.setRetries(origIntValue);
153         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
154
155         origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
156         kafkaCarrierTechnologyParameters.setBatchSize(-1);
157         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
158         kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
159         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
160
161         origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
162         kafkaCarrierTechnologyParameters.setLingerTime(-1);
163         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
164         kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
165         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
166
167         long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
168         kafkaCarrierTechnologyParameters.setBufferMemory(-1);
169         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
170         kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
171         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
172
173         origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
174         kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
175         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
176         kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
177         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
178
179         origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
180         kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
181         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
182         kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
183         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
184
185         origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
186         kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
187         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
188         kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
189         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
190
191         origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
192         kafkaCarrierTechnologyParameters.setKeySerializer(" ");
193         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
194         kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
195         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
196
197         origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
198         kafkaCarrierTechnologyParameters.setValueSerializer(" ");
199         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
200         kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
201         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
202
203         origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
204         kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
205         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
206         kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
207         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
208
209         origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
210         kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
211         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
212         kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
213         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
214
215         String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
216         kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
217         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
218         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
219         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
220
221         kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
222         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
223         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
224         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
225
226         String[] blankStringList = { null, "" };
227         kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
228         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
229         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
230         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
231
232         String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
233         kafkaCarrierTechnologyParameters.setKafkaProperties(null);
234         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
235         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
236         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
237
238         kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
239         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
240         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
241         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
242
243         // @formatter:offkafkaCarrierTechnologyParameters
244         String[][] kafkaProperties0 = {
245             {
246                 null, "Value0"
247             }
248         };
249         // @formatter:on
250
251         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
252         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
253         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
254         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
255
256         // @formatter:off
257         String[][] kafkaProperties1 = {
258             {
259                 "Property1", null
260             }
261         };
262         // @formatter:on
263
264         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
265         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
266         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
267         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
268
269         // @formatter:off
270         String[][] kafkaProperties2 = {
271             {
272                 "Property1", null
273             }
274         };
275         // @formatter:on
276
277         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
278         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
279
280         // @formatter:off
281         String[][] kafkaPropertiesWithEmptyValue = {
282             {
283                 "Property1", ""
284             }
285         };
286         // @formatter:on
287
288         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
289         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
290
291         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
292         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
293
294         // @formatter:off
295         String[][] kafkaProperties3 = {
296             {
297                 "Property1", "Value0", "Value1"
298             }
299         };
300         // @formatter:on
301
302         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
303         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
304         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
305         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
306     }
307
308     @Test
309     public void testExplicitImplicit() {
310         KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
311         assertNotNull(kafkaCtp);
312
313         assertTrue(kafkaCtp.validate().isValid());
314
315         // @formatter:off
316         assertEquals("localhost:9092",   kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
317         assertEquals("all",              kafkaCtp.getKafkaProducerProperties().get("acks"));
318         assertEquals("0",                kafkaCtp.getKafkaProducerProperties().get("retries"));
319         assertEquals("16384",            kafkaCtp.getKafkaProducerProperties().get("batch.size"));
320         assertEquals("1",                kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
321         assertEquals("33554432",         kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
322         assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
323         assertEquals("true",             kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
324         assertEquals("1000",             kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
325         assertEquals("30000",            kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
326         // @formatter:on
327
328         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
329                 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
330         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
331                 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
332         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
333                 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
334         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
335                 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
336         assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
337                 kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
338
339         // @formatter:off
340         String[][] kafkaProperties0 = {
341             {
342                 "bootstrap.servers", "localhost:9092"
343             }
344         };
345         // @formatter:on
346
347         kafkaCtp.setBootstrapServers(null);
348         kafkaCtp.setKafkaProperties(kafkaProperties0);
349         assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
350
351         // @formatter:off
352         String[][] kafkaProperties1 = {
353             {
354                 "bootstrap.servers", "localhost:9999"
355             }
356         };
357         // @formatter:on
358
359         kafkaCtp = new KafkaCarrierTechnologyParameters();
360         kafkaCtp.setKafkaProperties(kafkaProperties1);
361         assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
362
363         // @formatter:off
364         String[][] kafkaProperties2 = {
365             {
366                 "bootstrap.servers", "localhost:8888"
367             }
368         };
369         // @formatter:on
370
371         kafkaCtp = new KafkaCarrierTechnologyParameters();
372         kafkaCtp.setBootstrapServers("localhost:9092");
373         kafkaCtp.setKafkaProperties(kafkaProperties2);
374         assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
375
376         // @formatter:off
377         String[][] kafkaProperties3 = {
378             {
379                 "bootstrap.servers", "localhost:5555"
380             }
381         };
382         // @formatter:on
383
384         kafkaCtp = new KafkaCarrierTechnologyParameters();
385         kafkaCtp.setBootstrapServers("localhost:7777");
386         kafkaCtp.setKafkaProperties(kafkaProperties3);
387         assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
388     }
389 }