2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Samsung. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertTrue;
29 import java.util.Properties;
30 import org.junit.Test;
32 public class KafkaCarrierTechnologyParametersTest {
34 public void testKafkaCarrierTechnologyParameters() {
35 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
36 assertNotNull(kafkaCarrierTechnologyParameters);
38 assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
42 public void testGetKafkaProducerProperties() {
43 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
45 Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
46 assertNotNull(kafkaProducerProperties);
47 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
48 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
49 assertEquals(null, kafkaProducerProperties.get("group.id"));
50 assertEquals(null, kafkaProducerProperties.get("Property0"));
51 assertEquals(null, kafkaProducerProperties.get("Property1"));
52 assertEquals(null, kafkaProducerProperties.get("Property2"));
55 String[][] kafkaProperties = {
65 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
66 kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
67 assertNotNull(kafkaProducerProperties);
68 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
69 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
70 assertEquals(null, kafkaProducerProperties.get("group.id"));
71 assertEquals("Value0", kafkaProducerProperties.get("Property0"));
72 assertEquals("Value1", kafkaProducerProperties.get("Property1"));
73 assertEquals(null, kafkaProducerProperties.get("Property2"));
77 public void testGetKafkaConsumerProperties() {
78 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
80 Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
81 assertNotNull(kafkaConsumerProperties);
82 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
83 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
84 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
85 assertEquals(null, kafkaConsumerProperties.get("Property0"));
86 assertEquals(null, kafkaConsumerProperties.get("Property1"));
87 assertEquals(null, kafkaConsumerProperties.get("Property2"));
90 String[][] kafkaProperties = {
100 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
101 kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
102 assertNotNull(kafkaConsumerProperties);
103 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
104 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
105 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
106 assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
107 assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
108 assertEquals(null, kafkaConsumerProperties.get("Property2"));
112 public void testValidate() {
113 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
114 assertNotNull(kafkaCarrierTechnologyParameters);
116 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
118 String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
119 kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
120 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
121 kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
122 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
124 origStringValue = kafkaCarrierTechnologyParameters.getAcks();
125 kafkaCarrierTechnologyParameters.setAcks(" ");
126 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
127 kafkaCarrierTechnologyParameters.setAcks(origStringValue);
128 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
130 origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
131 kafkaCarrierTechnologyParameters.setGroupId(" ");
132 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
133 kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
134 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
136 origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
137 kafkaCarrierTechnologyParameters.setProducerTopic(" ");
138 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
139 kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
140 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
142 origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
143 kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
144 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
145 kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
146 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
148 int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
149 kafkaCarrierTechnologyParameters.setRetries(-1);
150 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
151 kafkaCarrierTechnologyParameters.setRetries(origIntValue);
152 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
154 origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
155 kafkaCarrierTechnologyParameters.setBatchSize(-1);
156 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
157 kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
158 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
160 origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
161 kafkaCarrierTechnologyParameters.setLingerTime(-1);
162 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
163 kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
164 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
166 long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
167 kafkaCarrierTechnologyParameters.setBufferMemory(-1);
168 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
169 kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
170 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
172 origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
173 kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
174 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
175 kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
176 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
178 origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
179 kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
180 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
181 kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
182 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
184 origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
185 kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
186 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
187 kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
188 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
190 origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
191 kafkaCarrierTechnologyParameters.setKeySerializer(" ");
192 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
193 kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
194 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
196 origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
197 kafkaCarrierTechnologyParameters.setValueSerializer(" ");
198 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
199 kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
200 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
202 origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
203 kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
204 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
205 kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
206 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
208 origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
209 kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
210 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
211 kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
212 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
214 String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
215 kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
216 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
217 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
218 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
220 kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
221 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
222 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
223 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
225 String[] blankStringList = { null, "" };
226 kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
227 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
228 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
229 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
231 String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
232 kafkaCarrierTechnologyParameters.setKafkaProperties(null);
233 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
234 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
235 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
237 kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
238 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
239 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
240 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
242 // @formatter:offkafkaCarrierTechnologyParameters
243 String[][] kafkaProperties0 = {
250 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
251 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
252 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
253 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
256 String[][] kafkaProperties1 = {
263 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
264 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
265 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
266 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
269 String[][] kafkaProperties2 = {
276 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
277 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
278 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
279 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
282 String[][] kafkaProperties3 = {
284 "Property1", "Value0", "Value1"
289 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
290 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
291 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
292 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
296 public void testExplicitImplicit() {
297 KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
298 assertNotNull(kafkaCtp);
300 assertTrue(kafkaCtp.validate().isValid());
303 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
304 assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
305 assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
306 assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
307 assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
308 assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
309 assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
310 assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
311 assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
312 assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
315 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
316 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
317 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
318 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
319 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
320 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
321 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
322 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
323 assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
324 kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
327 String[][] kafkaProperties0 = {
329 "bootstrap.servers", "localhost:9092"
334 kafkaCtp.setBootstrapServers(null);
335 kafkaCtp.setKafkaProperties(kafkaProperties0);
336 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
339 String[][] kafkaProperties1 = {
341 "bootstrap.servers", "localhost:9999"
346 kafkaCtp = new KafkaCarrierTechnologyParameters();
347 kafkaCtp.setKafkaProperties(kafkaProperties1);
348 assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
351 String[][] kafkaProperties2 = {
353 "bootstrap.servers", "localhost:8888"
358 kafkaCtp = new KafkaCarrierTechnologyParameters();
359 kafkaCtp.setBootstrapServers("localhost:9092");
360 kafkaCtp.setKafkaProperties(kafkaProperties2);
361 assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
364 String[][] kafkaProperties3 = {
366 "bootstrap.servers", "localhost:5555"
371 kafkaCtp = new KafkaCarrierTechnologyParameters();
372 kafkaCtp.setBootstrapServers("localhost:7777");
373 kafkaCtp.setKafkaProperties(kafkaProperties3);
374 assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));