2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Samsung. All rights reserved.
4 * Modifications Copyright (C) 2019,2023 Nordix Foundation.
5 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.plugins.event.carrier.kafka;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertTrue;
30 import java.util.Properties;
31 import org.junit.Test;
33 public class KafkaCarrierTechnologyParametersTest {
35 public void testKafkaCarrierTechnologyParameters() {
36 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
37 assertNotNull(kafkaCarrierTechnologyParameters);
39 assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
43 public void testGetKafkaProducerProperties() {
44 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
46 Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
47 assertNotNull(kafkaProducerProperties);
48 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
49 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
50 assertEquals(null, kafkaProducerProperties.get("group.id"));
51 assertEquals(null, kafkaProducerProperties.get("Property0"));
52 assertEquals(null, kafkaProducerProperties.get("Property1"));
53 assertEquals(null, kafkaProducerProperties.get("Property2"));
56 String[][] kafkaProperties = {
66 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
67 kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
68 assertNotNull(kafkaProducerProperties);
69 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
70 assertEquals("1", kafkaProducerProperties.get("linger.ms"));
71 assertEquals(null, kafkaProducerProperties.get("group.id"));
72 assertEquals("Value0", kafkaProducerProperties.get("Property0"));
73 assertEquals("Value1", kafkaProducerProperties.get("Property1"));
74 assertEquals(null, kafkaProducerProperties.get("Property2"));
78 public void testGetKafkaConsumerProperties() {
79 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
81 Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
82 assertNotNull(kafkaConsumerProperties);
83 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
84 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
85 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
86 assertEquals(null, kafkaConsumerProperties.get("Property0"));
87 assertEquals(null, kafkaConsumerProperties.get("Property1"));
88 assertEquals(null, kafkaConsumerProperties.get("Property2"));
91 String[][] kafkaProperties = {
101 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
102 kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
103 assertNotNull(kafkaConsumerProperties);
104 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
105 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
106 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
107 assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
108 assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
109 assertEquals(null, kafkaConsumerProperties.get("Property2"));
113 public void testValidate() {
114 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
115 assertNotNull(kafkaCarrierTechnologyParameters);
117 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
119 String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
120 kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
121 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
122 kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
123 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
125 origStringValue = kafkaCarrierTechnologyParameters.getAcks();
126 kafkaCarrierTechnologyParameters.setAcks(" ");
127 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
128 kafkaCarrierTechnologyParameters.setAcks(origStringValue);
129 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
131 origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
132 kafkaCarrierTechnologyParameters.setGroupId(" ");
133 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
134 kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
135 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
137 origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
138 kafkaCarrierTechnologyParameters.setProducerTopic(" ");
139 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
140 kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
141 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
143 int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
144 kafkaCarrierTechnologyParameters.setRetries(-1);
145 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
146 kafkaCarrierTechnologyParameters.setRetries(origIntValue);
147 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
149 origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
150 kafkaCarrierTechnologyParameters.setBatchSize(-1);
151 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
152 kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
153 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
155 origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
156 kafkaCarrierTechnologyParameters.setLingerTime(-1);
157 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
158 kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
159 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
161 long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
162 kafkaCarrierTechnologyParameters.setBufferMemory(-1);
163 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
164 kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
165 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
167 origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
168 kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
169 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
170 kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
171 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
173 origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
174 kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
175 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
176 kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
177 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
179 origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
180 kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
181 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
182 kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
183 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
185 origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
186 kafkaCarrierTechnologyParameters.setKeySerializer(" ");
187 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
188 kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
189 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
191 origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
192 kafkaCarrierTechnologyParameters.setValueSerializer(" ");
193 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
194 kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
195 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
197 origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
198 kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
199 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
200 kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
201 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
203 origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
204 kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
205 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
206 kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
207 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
209 String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
210 kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
211 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
212 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
213 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
215 kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
216 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
217 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
218 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
220 String[] blankStringList = { null, "" };
221 kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
222 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
223 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
224 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
226 String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
227 kafkaCarrierTechnologyParameters.setKafkaProperties(null);
228 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
229 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
230 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
232 kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
233 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
234 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
235 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
237 // @formatter:offkafkaCarrierTechnologyParameters
238 String[][] kafkaProperties0 = {
245 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
246 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
247 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
248 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
251 String[][] kafkaProperties1 = {
258 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
259 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
260 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
261 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
264 String[][] kafkaProperties2 = {
271 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
272 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
275 String[][] kafkaPropertiesWithEmptyValue = {
282 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
283 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
285 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
286 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
289 String[][] kafkaProperties3 = {
291 "Property1", "Value0", "Value1"
296 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
297 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
298 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
299 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
303 public void testExplicitImplicit() {
304 KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
305 assertNotNull(kafkaCtp);
307 assertTrue(kafkaCtp.validate().isValid());
310 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
311 assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
312 assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
313 assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
314 assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
315 assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
316 assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
317 assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
318 assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
319 assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
322 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
323 kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
324 assertEquals("org.apache.kafka.common.serialization.StringSerializer",
325 kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
326 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
327 kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
328 assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
329 kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
332 String[][] kafkaProperties0 = {
334 "bootstrap.servers", "localhost:9092"
339 kafkaCtp.setBootstrapServers(null);
340 kafkaCtp.setKafkaProperties(kafkaProperties0);
341 assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
344 String[][] kafkaProperties1 = {
346 "bootstrap.servers", "localhost:9999"
351 kafkaCtp = new KafkaCarrierTechnologyParameters();
352 kafkaCtp.setKafkaProperties(kafkaProperties1);
353 assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
356 String[][] kafkaProperties2 = {
358 "bootstrap.servers", "localhost:8888"
363 kafkaCtp = new KafkaCarrierTechnologyParameters();
364 kafkaCtp.setBootstrapServers("localhost:9092");
365 kafkaCtp.setKafkaProperties(kafkaProperties2);
366 assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
369 String[][] kafkaProperties3 = {
371 "bootstrap.servers", "localhost:5555"
376 kafkaCtp = new KafkaCarrierTechnologyParameters();
377 kafkaCtp.setBootstrapServers("localhost:7777");
378 kafkaCtp.setKafkaProperties(kafkaProperties3);
379 assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));