Merge "Improve robustness of unit testing"
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-kafka / src / main / java / org / onap / policy / apex / plugins / event / carrier / kafka / KafkaCarrierTechnologyParameters.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
23
24 import java.time.Duration;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Properties;
28
29 import lombok.Getter;
30 import lombok.Setter;
31
32 import org.apache.commons.lang3.StringUtils;
33 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
34 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
35 import org.onap.policy.common.parameters.GroupValidationResult;
36 import org.onap.policy.common.parameters.ValidationStatus;
37 import org.onap.policy.common.parameters.annotations.Min;
38 import org.onap.policy.common.parameters.annotations.NotBlank;
39
40 /**
41  * Apex parameters for Kafka as an event carrier technology.
42  *
43  * @author Liam Fallon (liam.fallon@ericsson.com)
44  */
45 @Getter
46 @Setter
47 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
48     // @formatter:off
49     /** The label of this carrier technology. */
50     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
51
52     /** The producer plugin class for the Kafka carrier technology. */
53     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getName();
54
55     /** The consumer plugin class for the Kafka carrier technology. */
56     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName();
57
58     // Repeated strings in messages
59     private static final String ENTRY = "entry ";
60     private static final String KAFKA_PROPERTIES = "kafkaProperties";
61
62     // Default parameter values
63     private static final String   DEFAULT_ACKS             = "all";
64     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
65     private static final int      DEFAULT_RETRIES          = 0;
66     private static final int      DEFAULT_BATCH_SIZE       = 16384;
67     private static final int      DEFAULT_LINGER_TIME      = 1;
68     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
69     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
70     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
71     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
72     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
73     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
74     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
75     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
76     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
77     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
78     private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getName();
79
80     // Parameter property map tokens
81     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
82     private static final String PROPERTY_ACKS               = "acks";
83     private static final String PROPERTY_RETRIES            = "retries";
84     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
85     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
86     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
87     private static final String PROPERTY_GROUP_ID           = "group.id";
88     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
89     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
90     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
91     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
92     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
93     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
94     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
95     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
96
97     // kafka carrier parameters
98     @NotBlank
99     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
100     @NotBlank
101     private String   acks              = DEFAULT_ACKS;
102     @Min(value = 0)
103     private int      retries           = DEFAULT_RETRIES;
104     @Min(value = 0)
105     private int      batchSize         = DEFAULT_BATCH_SIZE;
106     @Min(value = 0)
107     private int      lingerTime        = DEFAULT_LINGER_TIME;
108     @Min(value = 0)
109     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
110     @NotBlank
111     private String   groupId           = DEFAULT_GROUP_ID;
112     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
113     @Min(value = 0)
114     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
115     @Min(value = 0)
116     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
117     @NotBlank
118     private String   producerTopic     = DEFAULT_PROD_TOPIC;
119     @Min(value = 0)
120     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
121     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
122     @NotBlank
123     private String   keySerializer     = DEFAULT_STRING_SERZER;
124     @NotBlank
125     private String   valueSerializer   = DEFAULT_STRING_SERZER;
126     @NotBlank
127     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
128     @NotBlank
129     private String   valueDeserializer = DEFAULT_STRING_DESZER;
130     @NotBlank
131     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
132
133     // All Kafka properties can be specified as an array of key-value pairs
134     private String[][] kafkaProperties = null;
135
136     // @formatter:on
137
138     /**
139      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
140      * service.
141      */
142     public KafkaCarrierTechnologyParameters() {
143         super();
144
145         // Set the carrier technology properties for the kafka carrier technology
146         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
147         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
148         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
149     }
150
151     /**
152      * Gets the kafka producer properties.
153      *
154      * @return the kafka producer properties
155      */
156     public Properties getKafkaProducerProperties() {
157         final Properties retKafkaProps = new Properties();
158
159         // Add properties from the Kafka property array
160         if (kafkaProperties != null) {
161             for (int i = 0; i < kafkaProperties.length; i++) {
162                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
163             }
164         }
165
166         // @formatter:off
167         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
168         putExplicitProperty(retKafkaProps, PROPERTY_ACKS,              acks,             DEFAULT_ACKS);
169         putExplicitProperty(retKafkaProps, PROPERTY_RETRIES,           retries,          DEFAULT_RETRIES);
170         putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE,        batchSize,        DEFAULT_BATCH_SIZE);
171         putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME,       lingerTime,       DEFAULT_LINGER_TIME);
172         putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY,     bufferMemory,     DEFAULT_BUFFER_MEMORY);
173         putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER,    keySerializer,    DEFAULT_STRING_SERZER);
174         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER,  valueSerializer,  DEFAULT_STRING_SERZER);
175         putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
176         // @formatter:on
177
178         return retKafkaProps;
179     }
180
181     /**
182      * Gets the kafka consumer properties.
183      *
184      * @return the kafka consumer properties
185      */
186     public Properties getKafkaConsumerProperties() {
187         final Properties retKafkaProps = new Properties();
188
189         // Add properties from the Kafka property array
190         if (kafkaProperties != null) {
191             for (int i = 0; i < kafkaProperties.length; i++) {
192                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
193             }
194         }
195
196         // @formatter:off
197         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS,  bootstrapServers, DEFAULT_BOOT_SERVERS);
198         putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID,           groupId,          DEFAULT_GROUP_ID);
199         putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
200         putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME,   autoCommitTime,   DEFAULT_AUTO_COMMIT_TIME);
201         putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT,    sessionTimeout,   DEFAULT_SESSION_TIMEOUT);
202         putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER,   keyDeserializer,  DEFAULT_STRING_DESZER);
203         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
204         // @formatter:on
205
206         return retKafkaProps;
207     }
208
209     /**
210      * Gets the consumer topic list.
211      *
212      * @return the consumer topic list
213      */
214     public Collection<String> getConsumerTopicListAsCollection() {
215         return Arrays.asList(consumerTopicList);
216     }
217
218     /**
219      * Gets the consumer poll duration.
220      * @return The poll duration
221      */
222     public Duration getConsumerPollDuration() {
223         return Duration.ofMillis(consumerPollTime);
224     }
225
226     /**
227      * {@inheritDoc}.
228      */
229     @Override
230     public GroupValidationResult validate() {
231         final GroupValidationResult result = super.validate();
232
233         validateConsumerTopicList(result);
234
235         validateKafkaProperties(result);
236
237         return result;
238     }
239
240     /**
241      * Validate the consumer topic list.
242      *
243      * @param result the result of the validation.
244      */
245     private void validateConsumerTopicList(final GroupValidationResult result) {
246         if (consumerTopicList == null || consumerTopicList.length == 0) {
247             result.setResult("consumerTopicList", ValidationStatus.INVALID,
248                     "not specified, must be specified as a list of strings");
249             return;
250         }
251
252         StringBuilder consumerTopicStringBuilder = new StringBuilder();
253         for (final String consumerTopic : consumerTopicList) {
254             if (StringUtils.isBlank(consumerTopic)) {
255                 consumerTopicStringBuilder.append(consumerTopic + "/");
256             }
257         }
258         if (consumerTopicStringBuilder.length() > 0) {
259             result.setResult("consumerTopicList", ValidationStatus.INVALID,
260                     "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
261         }
262     }
263
264     /**
265      * Validate the kafka properties.
266      *
267      * @param result the result of the validation.
268      */
269     private void validateKafkaProperties(final GroupValidationResult result) {
270         // Kafka properties are optional
271         if (kafkaProperties == null || kafkaProperties.length == 0) {
272             return;
273         }
274
275         for (int i = 0; i < kafkaProperties.length; i++) {
276             if (kafkaProperties[i].length != 2) {
277                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
278                         ENTRY + i + " invalid, kafka properties must be name-value pairs");
279             }
280
281             if (StringUtils.isBlank(kafkaProperties[i][0])) {
282                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
283                         ENTRY + i + " invalid, key is null or blank");
284             }
285
286             if (StringUtils.isBlank(kafkaProperties[i][1])) {
287                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
288                         ENTRY + i + " invalid, value is null or blank");
289             }
290         }
291     }
292
293     /**
294      * Put a property into the properties if it is not already defined and is not the default value.
295      *
296      * @param returnKafkaProperties the properties to set the value in
297      * @param property the property to put
298      * @param value the value of the property to put
299      * @param defaultValue the default value of the property to put
300      */
301     private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
302             final Object value, final Object defaultValue) {
303
304         // Check if the property is already in the properties
305         if (!returnKafkaProperties.containsKey(property)) {
306             // Not found, so add it
307             returnKafkaProperties.setProperty(property, value.toString());
308         }
309         else {
310             // Found, only overwrite if the property does not have the default value
311             if (value == null) {
312                 returnKafkaProperties.setProperty(property, defaultValue.toString());
313             }
314             else if (!value.toString().contentEquals(defaultValue.toString())) {
315                 returnKafkaProperties.setProperty(property, value.toString());
316             }
317         }
318     }
319 }