Changes for checkstyle 8.32
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-kafka / src / test / java / org / onap / policy / apex / plugins / event / carrier / kafka / KafkaCarrierTechnologyParametersTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Samsung. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertTrue;
28
29 import java.util.Properties;
30 import org.junit.Test;
31
32 public class KafkaCarrierTechnologyParametersTest {
33     @Test
34     public void testKafkaCarrierTechnologyParameters() {
35         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
36         assertNotNull(kafkaCarrierTechnologyParameters);
37
38         assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
39     }
40
41     @Test
42     public void testGetKafkaProducerProperties() {
43         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
44
45         Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
46         assertNotNull(kafkaProducerProperties);
47         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
48         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
49         assertEquals(null, kafkaProducerProperties.get("group.id"));
50         assertEquals(null, kafkaProducerProperties.get("Property0"));
51         assertEquals(null, kafkaProducerProperties.get("Property1"));
52         assertEquals(null, kafkaProducerProperties.get("Property2"));
53
54         // @formatter:off
55         String[][] kafkaProperties = {
56             {
57                 "Property0", "Value0"
58             },
59             {
60                 "Property1", "Value1"
61             }
62         };
63         // @formatter:on
64
65         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
66         kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
67         assertNotNull(kafkaProducerProperties);
68         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
69         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
70         assertEquals(null, kafkaProducerProperties.get("group.id"));
71         assertEquals("Value0", kafkaProducerProperties.get("Property0"));
72         assertEquals("Value1", kafkaProducerProperties.get("Property1"));
73         assertEquals(null, kafkaProducerProperties.get("Property2"));
74     }
75
76     @Test
77     public void testGetKafkaConsumerProperties() {
78         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
79
80         Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
81         assertNotNull(kafkaConsumerProperties);
82         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
83         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
84         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
85         assertEquals(null, kafkaConsumerProperties.get("Property0"));
86         assertEquals(null, kafkaConsumerProperties.get("Property1"));
87         assertEquals(null, kafkaConsumerProperties.get("Property2"));
88
89         // @formatter:off
90         String[][] kafkaProperties = {
91             {
92                 "Property0", "Value0"
93             },
94             {
95                 "Property1", "Value1"
96             }
97         };
98         // @formatter:on
99
100         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
101         kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
102         assertNotNull(kafkaConsumerProperties);
103         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
104         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
105         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
106         assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
107         assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
108         assertEquals(null, kafkaConsumerProperties.get("Property2"));
109     }
110
111     @Test
112     public void testValidate() {
113         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
114         assertNotNull(kafkaCarrierTechnologyParameters);
115
116         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
117
118         String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
119         kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
120         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
121         kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
122         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
123
124         origStringValue = kafkaCarrierTechnologyParameters.getAcks();
125         kafkaCarrierTechnologyParameters.setAcks(" ");
126         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
127         kafkaCarrierTechnologyParameters.setAcks(origStringValue);
128         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
129
130         origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
131         kafkaCarrierTechnologyParameters.setGroupId(" ");
132         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
133         kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
134         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
135
136         origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
137         kafkaCarrierTechnologyParameters.setProducerTopic(" ");
138         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
139         kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
140         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
141
142         origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
143         kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
144         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
145         kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
146         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
147
148         int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
149         kafkaCarrierTechnologyParameters.setRetries(-1);
150         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
151         kafkaCarrierTechnologyParameters.setRetries(origIntValue);
152         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
153
154         origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
155         kafkaCarrierTechnologyParameters.setBatchSize(-1);
156         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
157         kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
158         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
159
160         origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
161         kafkaCarrierTechnologyParameters.setLingerTime(-1);
162         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
163         kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
164         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
165
166         long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
167         kafkaCarrierTechnologyParameters.setBufferMemory(-1);
168         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
169         kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
170         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
171
172         origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
173         kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
174         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
175         kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
176         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
177
178         origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
179         kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
180         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
181         kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
182         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
183
184         origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
185         kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
186         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
187         kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
188         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
189
190         origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
191         kafkaCarrierTechnologyParameters.setKeySerializer(" ");
192         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
193         kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
194         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
195
196         origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
197         kafkaCarrierTechnologyParameters.setValueSerializer(" ");
198         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
199         kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
200         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
201
202         origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
203         kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
204         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
205         kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
206         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
207
208         origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
209         kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
210         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
211         kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
212         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
213
214         String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
215         kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
216         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
217         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
218         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
219
220         kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
221         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
222         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
223         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
224
225         String[] blankStringList = { null, "" };
226         kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
227         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
228         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
229         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
230
231         String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
232         kafkaCarrierTechnologyParameters.setKafkaProperties(null);
233         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
234         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
235         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
236
237         kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
238         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
239         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
240         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
241
242         // @formatter:offkafkaCarrierTechnologyParameters
243         String[][] kafkaProperties0 = {
244             {
245                 null, "Value0"
246             }
247         };
248         // @formatter:on
249
250         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
251         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
252         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
253         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
254
255         // @formatter:off
256         String[][] kafkaProperties1 = {
257             {
258                 "Property1", null
259             }
260         };
261         // @formatter:on
262
263         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
264         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
265         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
266         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
267
268         // @formatter:off
269         String[][] kafkaProperties2 = {
270             {
271                 "Property1", null
272             }
273         };
274         // @formatter:on
275
276         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
277         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
278         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
279         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
280
281         // @formatter:off
282         String[][] kafkaProperties3 = {
283             {
284                 "Property1", "Value0", "Value1"
285             }
286         };
287         // @formatter:on
288
289         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
290         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
291         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
292         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
293     }
294
295     @Test
296     public void testExplicitImplicit() {
297         KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
298         assertNotNull(kafkaCtp);
299
300         assertTrue(kafkaCtp.validate().isValid());
301
302         // @formatter:off
303         assertEquals("localhost:9092",   kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
304         assertEquals("all",              kafkaCtp.getKafkaProducerProperties().get("acks"));
305         assertEquals("0",                kafkaCtp.getKafkaProducerProperties().get("retries"));
306         assertEquals("16384",            kafkaCtp.getKafkaProducerProperties().get("batch.size"));
307         assertEquals("1",                kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
308         assertEquals("33554432",         kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
309         assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
310         assertEquals("true",             kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
311         assertEquals("1000",             kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
312         assertEquals("30000",            kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
313         // @formatter:on
314
315         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
316                 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
317         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
318                 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
319         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
320                 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
321         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
322                 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
323         assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
324                 kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
325
326         // @formatter:off
327         String[][] kafkaProperties0 = {
328             {
329                 "bootstrap.servers", "localhost:9092"
330             }
331         };
332         // @formatter:on
333
334         kafkaCtp.setBootstrapServers(null);
335         kafkaCtp.setKafkaProperties(kafkaProperties0);
336         assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
337
338         // @formatter:off
339         String[][] kafkaProperties1 = {
340             {
341                 "bootstrap.servers", "localhost:9999"
342             }
343         };
344         // @formatter:on
345
346         kafkaCtp = new KafkaCarrierTechnologyParameters();
347         kafkaCtp.setKafkaProperties(kafkaProperties1);
348         assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
349
350         // @formatter:off
351         String[][] kafkaProperties2 = {
352             {
353                 "bootstrap.servers", "localhost:8888"
354             }
355         };
356         // @formatter:on
357
358         kafkaCtp = new KafkaCarrierTechnologyParameters();
359         kafkaCtp.setBootstrapServers("localhost:9092");
360         kafkaCtp.setKafkaProperties(kafkaProperties2);
361         assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
362
363         // @formatter:off
364         String[][] kafkaProperties3 = {
365             {
366                 "bootstrap.servers", "localhost:5555"
367             }
368         };
369         // @formatter:on
370
371         kafkaCtp = new KafkaCarrierTechnologyParameters();
372         kafkaCtp.setBootstrapServers("localhost:7777");
373         kafkaCtp.setKafkaProperties(kafkaProperties3);
374         assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
375     }
376 }