5b6ec33c622dce6087b502575443e7e0ab3b5db8
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-kafka / src / test / java / org / onap / policy / apex / plugins / event / carrier / kafka / KafkaCarrierTechnologyParametersTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Samsung. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertTrue;
28
29 import java.util.Properties;
30
31 import org.junit.Test;
32
33 public class KafkaCarrierTechnologyParametersTest {
34     @Test
35     public void testKafkaCarrierTechnologyParameters() {
36         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
37         assertNotNull(kafkaCarrierTechnologyParameters);
38
39         assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
40     }
41
42     @Test
43     public void testGetKafkaProducerProperties() {
44         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
45
46         Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
47         assertNotNull(kafkaProducerProperties);
48         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
49         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
50         assertEquals(null, kafkaProducerProperties.get("group.id"));
51         assertEquals(null, kafkaProducerProperties.get("Property0"));
52         assertEquals(null, kafkaProducerProperties.get("Property1"));
53         assertEquals(null, kafkaProducerProperties.get("Property2"));
54
55         // @formatter:off
56         String[][] kafkaProperties = {
57             {
58                 "Property0", "Value0"
59             },
60             {
61                 "Property1", "Value1"
62             }
63         };
64         // @formatter:on
65
66         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
67         kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
68         assertNotNull(kafkaProducerProperties);
69         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
70         assertEquals("1", kafkaProducerProperties.get("linger.ms"));
71         assertEquals(null, kafkaProducerProperties.get("group.id"));
72         assertEquals("Value0", kafkaProducerProperties.get("Property0"));
73         assertEquals("Value1", kafkaProducerProperties.get("Property1"));
74         assertEquals(null, kafkaProducerProperties.get("Property2"));
75     }
76
77     @Test
78     public void testGetKafkaConsumerProperties() {
79         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
80
81         Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
82         assertNotNull(kafkaConsumerProperties);
83         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
84         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
85         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
86         assertEquals(null, kafkaConsumerProperties.get("Property0"));
87         assertEquals(null, kafkaConsumerProperties.get("Property1"));
88         assertEquals(null, kafkaConsumerProperties.get("Property2"));
89
90         // @formatter:off
91         String[][] kafkaProperties = {
92             {
93                 "Property0", "Value0"
94             },
95             {
96                 "Property1", "Value1"
97             }
98         };
99         // @formatter:on
100
101         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
102         kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
103         assertNotNull(kafkaConsumerProperties);
104         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
105         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
106         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
107         assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
108         assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
109         assertEquals(null, kafkaConsumerProperties.get("Property2"));
110     }
111
112     @Test
113     public void testValidate() {
114         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
115         assertNotNull(kafkaCarrierTechnologyParameters);
116
117         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
118
119         String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
120         kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
121         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
122         kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
123         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
124
125         origStringValue = kafkaCarrierTechnologyParameters.getAcks();
126         kafkaCarrierTechnologyParameters.setAcks(" ");
127         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
128         kafkaCarrierTechnologyParameters.setAcks(origStringValue);
129         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
130
131         origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
132         kafkaCarrierTechnologyParameters.setGroupId(" ");
133         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
134         kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
135         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
136
137         origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
138         kafkaCarrierTechnologyParameters.setProducerTopic(" ");
139         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
140         kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
141         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
142
143         origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
144         kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
145         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
146         kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
147         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
148
149         int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
150         kafkaCarrierTechnologyParameters.setRetries(-1);
151         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
152         kafkaCarrierTechnologyParameters.setRetries(origIntValue);
153         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
154
155         origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
156         kafkaCarrierTechnologyParameters.setBatchSize(-1);
157         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
158         kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
159         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
160
161         origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
162         kafkaCarrierTechnologyParameters.setLingerTime(-1);
163         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
164         kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
165         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
166
167         long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
168         kafkaCarrierTechnologyParameters.setBufferMemory(-1);
169         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
170         kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
171         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
172
173         origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
174         kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
175         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
176         kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
177         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
178
179         origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
180         kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
181         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
182         kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
183         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
184
185         origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
186         kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
187         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
188         kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
189         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
190
191         origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
192         kafkaCarrierTechnologyParameters.setKeySerializer(" ");
193         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
194         kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
195         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
196
197         origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
198         kafkaCarrierTechnologyParameters.setValueSerializer(" ");
199         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
200         kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
201         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
202
203         origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
204         kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
205         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
206         kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
207         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
208
209         origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
210         kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
211         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
212         kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
213         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
214
215         String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
216         kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
217         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
218         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
219         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
220
221         kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
222         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
223         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
224         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
225
226         String[] blankStringList = { null, "" };
227         kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
228         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
229         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
230         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
231
232         String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
233         kafkaCarrierTechnologyParameters.setKafkaProperties(null);
234         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
235         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
236         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
237
238         kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
239         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
240         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
241         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
242
243         // @formatter:offkafkaCarrierTechnologyParameters
244         String[][] kafkaProperties0 = {
245             {
246                 null, "Value0"
247             }
248         };
249         // @formatter:on
250
251         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
252         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
253         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
254         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
255
256         // @formatter:off
257         String[][] kafkaProperties1 = {
258             {
259                 "Property1", null
260             }
261         };
262         // @formatter:on
263
264         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
265         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
266         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
267         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
268
269         // @formatter:off
270         String[][] kafkaProperties2 = {
271             {
272                 "Property1", null
273             }
274         };
275         // @formatter:on
276
277         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
278         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
279         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
280         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
281
282         // @formatter:off
283         String[][] kafkaProperties3 = {
284             {
285                 "Property1", "Value0", "Value1"
286             }
287         };
288         // @formatter:on
289
290         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
291         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
292         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
293         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
294     }
295
296     @Test
297     public void testExplicitImplicit() {
298         KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
299         assertNotNull(kafkaCtp);
300
301         assertTrue(kafkaCtp.validate().isValid());
302
303         // @formatter:off
304         assertEquals("localhost:9092",   kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
305         assertEquals("all",              kafkaCtp.getKafkaProducerProperties().get("acks"));
306         assertEquals("0",                kafkaCtp.getKafkaProducerProperties().get("retries"));
307         assertEquals("16384",            kafkaCtp.getKafkaProducerProperties().get("batch.size"));
308         assertEquals("1",                kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
309         assertEquals("33554432",         kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
310         assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
311         assertEquals("true",             kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
312         assertEquals("1000",             kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
313         assertEquals("30000",            kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
314         // @formatter:on
315
316         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
317                 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
318         assertEquals("org.apache.kafka.common.serialization.StringSerializer",
319                 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
320         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
321                 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
322         assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
323                 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
324         assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
325                 kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
326
327         // @formatter:off
328         String[][] kafkaProperties0 = {
329             {
330                 "bootstrap.servers", "localhost:9092"
331             }
332         };
333         // @formatter:on
334
335         kafkaCtp.setBootstrapServers(null);
336         kafkaCtp.setKafkaProperties(kafkaProperties0);
337         assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
338
339         // @formatter:off
340         String[][] kafkaProperties1 = {
341             {
342                 "bootstrap.servers", "localhost:9999"
343             }
344         };
345         // @formatter:on
346
347         kafkaCtp = new KafkaCarrierTechnologyParameters();
348         kafkaCtp.setKafkaProperties(kafkaProperties1);
349         assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
350
351         // @formatter:off
352         String[][] kafkaProperties2 = {
353             {
354                 "bootstrap.servers", "localhost:8888"
355             }
356         };
357         // @formatter:on
358
359         kafkaCtp = new KafkaCarrierTechnologyParameters();
360         kafkaCtp.setBootstrapServers("localhost:9092");
361         kafkaCtp.setKafkaProperties(kafkaProperties2);
362         assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
363
364         // @formatter:off
365         String[][] kafkaProperties3 = {
366             {
367                 "bootstrap.servers", "localhost:5555"
368             }
369         };
370         // @formatter:on
371
372         kafkaCtp = new KafkaCarrierTechnologyParameters();
373         kafkaCtp.setBootstrapServers("localhost:7777");
374         kafkaCtp.setKafkaProperties(kafkaProperties3);
375         assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
376     }
377 }