2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Samsung. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.plugins.event.carrier.kafka;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertTrue;
30 import java.util.Properties;
31 import org.junit.Test;
33 public class KafkaCarrierTechnologyParametersTest {
35 public void testKafkaCarrierTechnologyParameters() {
36 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
37 assertNotNull(kafkaCarrierTechnologyParameters);
39 assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
43 public void testGetKafkaProducerProperties() {
44 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
46 Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
47 assertNotNull(kafkaProducerProperties);
48 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
49 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
50 assertEquals(null, kafkaProducerProperties.get("group.id"));
51 assertEquals(null, kafkaProducerProperties.get("Property0"));
52 assertEquals(null, kafkaProducerProperties.get("Property1"));
53 assertEquals(null, kafkaProducerProperties.get("Property2"));
56 String[][] kafkaProperties = {
66 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
67 kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
68 assertNotNull(kafkaProducerProperties);
69 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
70 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
71 assertEquals(null, kafkaProducerProperties.get("group.id"));
72 assertEquals("Value0", kafkaProducerProperties.get("Property0"));
73 assertEquals("Value1", kafkaProducerProperties.get("Property1"));
74 assertEquals(null, kafkaProducerProperties.get("Property2"));
78 public void testGetKafkaConsumerProperties() {
79 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
81 Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
82 assertNotNull(kafkaConsumerProperties);
83 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
84 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
85 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
86 assertEquals(null, kafkaConsumerProperties.get("Property0"));
87 assertEquals(null, kafkaConsumerProperties.get("Property1"));
88 assertEquals(null, kafkaConsumerProperties.get("Property2"));
91 String[][] kafkaProperties = {
101 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
102 kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
103 assertNotNull(kafkaConsumerProperties);
104 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
105 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
106 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
107 assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
108 assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
109 assertEquals(null, kafkaConsumerProperties.get("Property2"));
113 public void testValidate() {
114 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
115 assertNotNull(kafkaCarrierTechnologyParameters);
117 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
119 String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
120 kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
121 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
122 kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
123 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
125 origStringValue = kafkaCarrierTechnologyParameters.getAcks();
126 kafkaCarrierTechnologyParameters.setAcks(" ");
127 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
128 kafkaCarrierTechnologyParameters.setAcks(origStringValue);
129 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
131 origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
132 kafkaCarrierTechnologyParameters.setGroupId(" ");
133 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
134 kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
135 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
137 origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
138 kafkaCarrierTechnologyParameters.setProducerTopic(" ");
139 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
140 kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
141 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
143 origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
144 kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
145 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
146 kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
147 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
149 int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
150 kafkaCarrierTechnologyParameters.setRetries(-1);
151 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
152 kafkaCarrierTechnologyParameters.setRetries(origIntValue);
153 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
155 origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
156 kafkaCarrierTechnologyParameters.setBatchSize(-1);
157 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
158 kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
159 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
161 origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
162 kafkaCarrierTechnologyParameters.setLingerTime(-1);
163 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
164 kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
165 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
167 long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
168 kafkaCarrierTechnologyParameters.setBufferMemory(-1);
169 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
170 kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
171 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
173 origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
174 kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
175 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
176 kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
177 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
179 origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
180 kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
181 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
182 kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
183 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
185 origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
186 kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
187 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
188 kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
189 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
191 origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
192 kafkaCarrierTechnologyParameters.setKeySerializer(" ");
193 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
194 kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
195 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
197 origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
198 kafkaCarrierTechnologyParameters.setValueSerializer(" ");
199 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
200 kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
201 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
203 origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
204 kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
205 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
206 kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
207 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
209 origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
210 kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
211 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
212 kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
213 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
215 String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
216 kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
217 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
218 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
219 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
221 kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
222 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
223 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
224 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
226 String[] blankStringList = { null, "" };
227 kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
228 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
229 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
230 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
232 String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
233 kafkaCarrierTechnologyParameters.setKafkaProperties(null);
234 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
235 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
236 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
238 kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
239 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
240 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
241 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
243 // @formatter:offkafkaCarrierTechnologyParameters
244 String[][] kafkaProperties0 = {
251 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
252 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
253 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
254 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
257 String[][] kafkaProperties1 = {
264 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
265 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
266 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
267 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
270 String[][] kafkaProperties2 = {
277 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
278 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
281 String[][] kafkaPropertiesWithEmptyValue = {
288 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
289 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
291 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
292 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
295 String[][] kafkaProperties3 = {
297 "Property1", "Value0", "Value1"
302 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
303 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
304 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
305 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
309 public void testExplicitImplicit() {
310 KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
311 assertNotNull(kafkaCtp);
313 assertTrue(kafkaCtp.validate().isValid());
316 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
317 assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
318 assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
319 assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
320 assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
321 assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
322 assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
323 assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
324 assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
325 assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
328 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
329 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
330 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
331 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
332 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
333 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
334 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
335 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
336 assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
337 kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
340 String[][] kafkaProperties0 = {
342 "bootstrap.servers", "localhost:9092"
347 kafkaCtp.setBootstrapServers(null);
348 kafkaCtp.setKafkaProperties(kafkaProperties0);
349 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
352 String[][] kafkaProperties1 = {
354 "bootstrap.servers", "localhost:9999"
359 kafkaCtp = new KafkaCarrierTechnologyParameters();
360 kafkaCtp.setKafkaProperties(kafkaProperties1);
361 assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
364 String[][] kafkaProperties2 = {
366 "bootstrap.servers", "localhost:8888"
371 kafkaCtp = new KafkaCarrierTechnologyParameters();
372 kafkaCtp.setBootstrapServers("localhost:9092");
373 kafkaCtp.setKafkaProperties(kafkaProperties2);
374 assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
377 String[][] kafkaProperties3 = {
379 "bootstrap.servers", "localhost:5555"
384 kafkaCtp = new KafkaCarrierTechnologyParameters();
385 kafkaCtp.setBootstrapServers("localhost:7777");
386 kafkaCtp.setKafkaProperties(kafkaProperties3);
387 assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));