2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Samsung. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
28 import java.util.Properties;
30 import org.junit.Test;
32 public class KafkaCarrierTechnologyParametersTest {
34 public void testKafkaCarrierTechnologyParameters() {
35 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
36 assertNotNull(kafkaCarrierTechnologyParameters);
38 assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
42 public void testGetKafkaProducerProperties() {
43 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
45 Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
46 assertNotNull(kafkaProducerProperties);
47 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
48 assertEquals(1, kafkaProducerProperties.get("linger.ms"));
49 assertEquals(null, kafkaProducerProperties.get("group.id"));
50 assertEquals(null, kafkaProducerProperties.get("Property0"));
51 assertEquals(null, kafkaProducerProperties.get("Property1"));
52 assertEquals(null, kafkaProducerProperties.get("Property2"));
55 String[][] kafkaProperties = {
65 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
66 kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
67 assertNotNull(kafkaProducerProperties);
68 assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
69 assertEquals(1, kafkaProducerProperties.get("linger.ms"));
70 assertEquals(null, kafkaProducerProperties.get("group.id"));
71 assertEquals("Value0", kafkaProducerProperties.get("Property0"));
72 assertEquals("Value1", kafkaProducerProperties.get("Property1"));
73 assertEquals(null, kafkaProducerProperties.get("Property2"));
77 public void testGetKafkaConsumerProperties() {
78 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
80 Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
81 assertNotNull(kafkaConsumerProperties);
82 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
83 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
84 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
85 assertEquals(null, kafkaConsumerProperties.get("Property0"));
86 assertEquals(null, kafkaConsumerProperties.get("Property1"));
87 assertEquals(null, kafkaConsumerProperties.get("Property2"));
90 String[][] kafkaProperties = {
100 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
101 kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
102 assertNotNull(kafkaConsumerProperties);
103 assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
104 assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
105 assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
106 assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
107 assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
108 assertEquals(null, kafkaConsumerProperties.get("Property2"));
112 public void testValidate() {
113 KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
114 assertNotNull(kafkaCarrierTechnologyParameters);
116 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
118 String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
119 kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
120 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
121 kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
122 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
124 origStringValue = kafkaCarrierTechnologyParameters.getAcks();
125 kafkaCarrierTechnologyParameters.setAcks(" ");
126 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
127 kafkaCarrierTechnologyParameters.setAcks(origStringValue);
128 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
130 origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
131 kafkaCarrierTechnologyParameters.setGroupId(" ");
132 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
133 kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
134 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
136 origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
137 kafkaCarrierTechnologyParameters.setProducerTopic(" ");
138 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
139 kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
140 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
142 origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
143 kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
144 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
145 kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
146 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
148 int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
149 kafkaCarrierTechnologyParameters.setRetries(-1);
150 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
151 kafkaCarrierTechnologyParameters.setRetries(origIntValue);
152 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
154 origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
155 kafkaCarrierTechnologyParameters.setBatchSize(-1);
156 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
157 kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
158 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
160 origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
161 kafkaCarrierTechnologyParameters.setLingerTime(-1);
162 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
163 kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
164 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
166 long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
167 kafkaCarrierTechnologyParameters.setBufferMemory(-1);
168 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
169 kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
170 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
172 origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
173 kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
174 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
175 kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
176 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
178 origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
179 kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
180 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
181 kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
182 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
184 origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
185 kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
186 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
187 kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
188 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
190 origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
191 kafkaCarrierTechnologyParameters.setKeySerializer(" ");
192 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
193 kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
194 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
196 origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
197 kafkaCarrierTechnologyParameters.setValueSerializer(" ");
198 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
199 kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
200 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
202 origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
203 kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
204 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
205 kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
206 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
208 origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
209 kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
210 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
211 kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
212 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
214 String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
215 kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
216 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
217 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
218 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
220 kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
221 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
222 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
223 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
225 String[] blankStringList = {null, ""};
226 kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
227 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
228 kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
229 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
231 String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
232 kafkaCarrierTechnologyParameters.setKafkaProperties(null);
233 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
234 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
235 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
237 kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
238 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
239 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
240 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
243 String[][] kafkaProperties0 = {
250 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
251 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
252 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
253 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
256 String[][] kafkaProperties1 = {
263 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
264 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
265 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
266 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
269 String[][] kafkaProperties2 = {
276 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
277 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
278 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
279 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
282 String[][] kafkaProperties3 = {
284 "Property1", "Value0", "Value1"
289 kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
290 assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
291 kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
292 assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());