2f5405ba8ce3214ac80237644f45a075564a9d8e
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Samsung. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
27
28 import java.util.Properties;
29
30 import org.junit.Test;
31
32 public class KafkaCarrierTechnologyParametersTest {
33     @Test
34     public void testKafkaCarrierTechnologyParameters() {
35         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
36         assertNotNull(kafkaCarrierTechnologyParameters);
37
38         assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
39     }
40
41     @Test
42     public void testGetKafkaProducerProperties() {
43         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
44
45         Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
46         assertNotNull(kafkaProducerProperties);
47         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
48         assertEquals(1, kafkaProducerProperties.get("linger.ms"));
49         assertEquals(null, kafkaProducerProperties.get("group.id"));
50         assertEquals(null, kafkaProducerProperties.get("Property0"));
51         assertEquals(null, kafkaProducerProperties.get("Property1"));
52         assertEquals(null, kafkaProducerProperties.get("Property2"));
53
54         // @formatter:off
55         String[][] kafkaProperties = {
56             {
57                 "Property0", "Value0"
58             },
59             {
60                 "Property1", "Value1"
61             }
62         };
63         // @formatter:on
64
65         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
66         kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
67         assertNotNull(kafkaProducerProperties);
68         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
69         assertEquals(1, kafkaProducerProperties.get("linger.ms"));
70         assertEquals(null, kafkaProducerProperties.get("group.id"));
71         assertEquals("Value0", kafkaProducerProperties.get("Property0"));
72         assertEquals("Value1", kafkaProducerProperties.get("Property1"));
73         assertEquals(null, kafkaProducerProperties.get("Property2"));
74     }
75
76     @Test
77     public void testGetKafkaConsumerProperties() {
78         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
79
80         Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
81         assertNotNull(kafkaConsumerProperties);
82         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
83         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
84         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
85         assertEquals(null, kafkaConsumerProperties.get("Property0"));
86         assertEquals(null, kafkaConsumerProperties.get("Property1"));
87         assertEquals(null, kafkaConsumerProperties.get("Property2"));
88
89         // @formatter:off
90         String[][] kafkaProperties = {
91             {
92                 "Property0", "Value0"
93             },
94             {
95                 "Property1", "Value1"
96             }
97         };
98         // @formatter:on
99
100         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
101         kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
102         assertNotNull(kafkaConsumerProperties);
103         assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
104         assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
105         assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
106         assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
107         assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
108         assertEquals(null, kafkaConsumerProperties.get("Property2"));
109     }
110
111     @Test
112     public void testValidate() {
113         KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
114         assertNotNull(kafkaCarrierTechnologyParameters);
115
116         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
117
118         String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
119         kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
120         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
121         kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
122         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
123
124         origStringValue = kafkaCarrierTechnologyParameters.getAcks();
125         kafkaCarrierTechnologyParameters.setAcks(" ");
126         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
127         kafkaCarrierTechnologyParameters.setAcks(origStringValue);
128         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
129
130         origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
131         kafkaCarrierTechnologyParameters.setGroupId(" ");
132         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
133         kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
134         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
135
136         origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
137         kafkaCarrierTechnologyParameters.setProducerTopic(" ");
138         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
139         kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
140         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
141
142         origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
143         kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
144         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
145         kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
146         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
147
148         int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
149         kafkaCarrierTechnologyParameters.setRetries(-1);
150         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
151         kafkaCarrierTechnologyParameters.setRetries(origIntValue);
152         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
153
154         origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
155         kafkaCarrierTechnologyParameters.setBatchSize(-1);
156         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
157         kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
158         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
159
160         origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
161         kafkaCarrierTechnologyParameters.setLingerTime(-1);
162         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
163         kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
164         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
165
166         long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
167         kafkaCarrierTechnologyParameters.setBufferMemory(-1);
168         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
169         kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
170         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
171
172         origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
173         kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
174         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
175         kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
176         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
177
178         origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
179         kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
180         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
181         kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
182         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
183
184         origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
185         kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
186         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
187         kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
188         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
189
190         origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
191         kafkaCarrierTechnologyParameters.setKeySerializer(" ");
192         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
193         kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
194         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
195
196         origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
197         kafkaCarrierTechnologyParameters.setValueSerializer(" ");
198         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
199         kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
200         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
201
202         origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
203         kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
204         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
205         kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
206         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
207
208         origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
209         kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
210         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
211         kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
212         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
213
214         String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
215         kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
216         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
217         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
218         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
219
220         kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
221         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
222         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
223         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
224
225         String[] blankStringList = {null, ""};
226         kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
227         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
228         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
229         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
230
231         String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
232         kafkaCarrierTechnologyParameters.setKafkaProperties(null);
233         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
234         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
235         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
236
237         kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
238         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
239         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
240         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
241
242         // @formatter:off
243         String[][] kafkaProperties0 = {
244             {
245                 null, "Value0"
246             }
247         };
248         // @formatter:on
249
250         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
251         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
252         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
253         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
254
255         // @formatter:off
256         String[][] kafkaProperties1 = {
257             {
258                 "Property1", null
259             }
260         };
261         // @formatter:on
262
263         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
264         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
265         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
266         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
267
268         // @formatter:off
269         String[][] kafkaProperties2 = {
270             {
271                 "Property1", null
272             }
273         };
274         // @formatter:on
275
276         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
277         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
278         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
279         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
280
281         // @formatter:off
282         String[][] kafkaProperties3 = {
283             {
284                 "Property1", "Value0", "Value1"
285             }
286         };
287         // @formatter:on
288
289         kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
290         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
291         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
292         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
293
294     }
295 }