2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Samsung. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertTrue;
29 import java.util.Properties;
31 import org.junit.Test;
33 public class KafkaCarrierTechnologyParametersTest {
35 public void testKafkaCarrierTechnologyParameters() {
36 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
37 assertNotNull(kafkaCarrierTechnologyParameters);
39 assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
43 public void testGetKafkaProducerProperties() {
44 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
46 Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
47 assertNotNull(kafkaProducerProperties);
48 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
49 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
50 assertEquals(null, kafkaProducerProperties.get("group.id"));
51 assertEquals(null, kafkaProducerProperties.get("Property0"));
52 assertEquals(null, kafkaProducerProperties.get("Property1"));
53 assertEquals(null, kafkaProducerProperties.get("Property2"));
56 String[][] kafkaProperties = {
66 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
67 kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
68 assertNotNull(kafkaProducerProperties);
69 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
70 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
71 assertEquals(null, kafkaProducerProperties.get("group.id"));
72 assertEquals("Value0", kafkaProducerProperties.get("Property0"));
73 assertEquals("Value1", kafkaProducerProperties.get("Property1"));
74 assertEquals(null, kafkaProducerProperties.get("Property2"));
78 public void testGetKafkaConsumerProperties() {
79 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
81 Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
82 assertNotNull(kafkaConsumerProperties);
83 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
84 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
85 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
86 assertEquals(null, kafkaConsumerProperties.get("Property0"));
87 assertEquals(null, kafkaConsumerProperties.get("Property1"));
88 assertEquals(null, kafkaConsumerProperties.get("Property2"));
91 String[][] kafkaProperties = {
101 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
102 kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
103 assertNotNull(kafkaConsumerProperties);
104 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
105 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
106 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
107 assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
108 assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
109 assertEquals(null, kafkaConsumerProperties.get("Property2"));
113 public void testValidate() {
114 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
115 assertNotNull(kafkaCarrierTechnologyParameters);
117 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
119 String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
120 kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
121 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
122 kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
123 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
125 origStringValue = kafkaCarrierTechnologyParameters.getAcks();
126 kafkaCarrierTechnologyParameters.setAcks(" ");
127 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
128 kafkaCarrierTechnologyParameters.setAcks(origStringValue);
129 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
131 origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
132 kafkaCarrierTechnologyParameters.setGroupId(" ");
133 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
134 kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
135 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
137 origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
138 kafkaCarrierTechnologyParameters.setProducerTopic(" ");
139 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
140 kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
141 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
143 origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
144 kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
145 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
146 kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
147 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
149 int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
150 kafkaCarrierTechnologyParameters.setRetries(-1);
151 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
152 kafkaCarrierTechnologyParameters.setRetries(origIntValue);
153 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
155 origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
156 kafkaCarrierTechnologyParameters.setBatchSize(-1);
157 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
158 kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
159 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
161 origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
162 kafkaCarrierTechnologyParameters.setLingerTime(-1);
163 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
164 kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
165 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
167 long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
168 kafkaCarrierTechnologyParameters.setBufferMemory(-1);
169 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
170 kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
171 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
173 origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
174 kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
175 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
176 kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
177 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
179 origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
180 kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
181 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
182 kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
183 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
185 origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
186 kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
187 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
188 kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
189 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
191 origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
192 kafkaCarrierTechnologyParameters.setKeySerializer(" ");
193 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
194 kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
195 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
197 origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
198 kafkaCarrierTechnologyParameters.setValueSerializer(" ");
199 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
200 kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
201 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
203 origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
204 kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
205 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
206 kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
207 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
209 origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
210 kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
211 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
212 kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
213 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
215 String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
216 kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
217 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
218 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
219 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
221 kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
222 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
223 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
224 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
226 String[] blankStringList = { null, "" };
227 kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
228 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
229 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
230 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
232 String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
233 kafkaCarrierTechnologyParameters.setKafkaProperties(null);
234 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
235 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
236 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
238 kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
239 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
240 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
241 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
243 // @formatter:offkafkaCarrierTechnologyParameters
244 String[][] kafkaProperties0 = {
251 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
252 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
253 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
254 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
257 String[][] kafkaProperties1 = {
264 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
265 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
266 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
267 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
270 String[][] kafkaProperties2 = {
277 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
278 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
279 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
280 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
283 String[][] kafkaProperties3 = {
285 "Property1", "Value0", "Value1"
290 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
291 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
292 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
293 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
297 public void testExplicitImplicit() {
298 KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
299 assertNotNull(kafkaCtp);
301 assertTrue(kafkaCtp.validate().isValid());
304 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
305 assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
306 assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
307 assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
308 assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
309 assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
310 assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
311 assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
312 assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
313 assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
316 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
317 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
318 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
319 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
320 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
321 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
322 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
323 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
324 assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
325 kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
328 String[][] kafkaProperties0 = {
330 "bootstrap.servers", "localhost:9092"
335 kafkaCtp.setBootstrapServers(null);
336 kafkaCtp.setKafkaProperties(kafkaProperties0);
337 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
340 String[][] kafkaProperties1 = {
342 "bootstrap.servers", "localhost:9999"
347 kafkaCtp = new KafkaCarrierTechnologyParameters();
348 kafkaCtp.setKafkaProperties(kafkaProperties1);
349 assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
352 String[][] kafkaProperties2 = {
354 "bootstrap.servers", "localhost:8888"
359 kafkaCtp = new KafkaCarrierTechnologyParameters();
360 kafkaCtp.setBootstrapServers("localhost:9092");
361 kafkaCtp.setKafkaProperties(kafkaProperties2);
362 assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
365 String[][] kafkaProperties3 = {
367 "bootstrap.servers", "localhost:5555"
372 kafkaCtp = new KafkaCarrierTechnologyParameters();
373 kafkaCtp.setBootstrapServers("localhost:7777");
374 kafkaCtp.setKafkaProperties(kafkaProperties3);
375 assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));