Changes for checkstyle 8.32
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-kafka / src / main / java / org / onap / policy / apex / plugins / event / carrier / kafka / KafkaCarrierTechnologyParameters.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
23
24 import java.time.Duration;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Properties;
28 import lombok.Getter;
29 import lombok.Setter;
30 import org.apache.commons.lang3.StringUtils;
31 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
32 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
33 import org.onap.policy.common.parameters.GroupValidationResult;
34 import org.onap.policy.common.parameters.ValidationStatus;
35 import org.onap.policy.common.parameters.annotations.Min;
36 import org.onap.policy.common.parameters.annotations.NotBlank;
37
38 /**
39  * Apex parameters for Kafka as an event carrier technology.
40  *
41  * @author Liam Fallon (liam.fallon@ericsson.com)
42  */
43 @Getter
44 @Setter
45 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
46     // @formatter:off
47     /** The label of this carrier technology. */
48     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
49
50     /** The producer plugin class for the Kafka carrier technology. */
51     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getName();
52
53     /** The consumer plugin class for the Kafka carrier technology. */
54     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName();
55
56     // Repeated strings in messages
57     private static final String ENTRY = "entry ";
58     private static final String KAFKA_PROPERTIES = "kafkaProperties";
59
60     // Default parameter values
61     private static final String   DEFAULT_ACKS             = "all";
62     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
63     private static final int      DEFAULT_RETRIES          = 0;
64     private static final int      DEFAULT_BATCH_SIZE       = 16384;
65     private static final int      DEFAULT_LINGER_TIME      = 1;
66     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
67     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
68     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
69     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
70     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
71     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
72     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
73     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
74     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
75     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
76     private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getName();
77
78     // Parameter property map tokens
79     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
80     private static final String PROPERTY_ACKS               = "acks";
81     private static final String PROPERTY_RETRIES            = "retries";
82     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
83     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
84     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
85     private static final String PROPERTY_GROUP_ID           = "group.id";
86     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
87     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
88     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
89     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
90     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
91     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
92     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
93     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
94
95     // kafka carrier parameters
96     @NotBlank
97     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
98     @NotBlank
99     private String   acks              = DEFAULT_ACKS;
100     @Min(value = 0)
101     private int      retries           = DEFAULT_RETRIES;
102     @Min(value = 0)
103     private int      batchSize         = DEFAULT_BATCH_SIZE;
104     @Min(value = 0)
105     private int      lingerTime        = DEFAULT_LINGER_TIME;
106     @Min(value = 0)
107     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
108     @NotBlank
109     private String   groupId           = DEFAULT_GROUP_ID;
110     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
111     @Min(value = 0)
112     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
113     @Min(value = 0)
114     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
115     @NotBlank
116     private String   producerTopic     = DEFAULT_PROD_TOPIC;
117     @Min(value = 0)
118     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
119     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
120     @NotBlank
121     private String   keySerializer     = DEFAULT_STRING_SERZER;
122     @NotBlank
123     private String   valueSerializer   = DEFAULT_STRING_SERZER;
124     @NotBlank
125     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
126     @NotBlank
127     private String   valueDeserializer = DEFAULT_STRING_DESZER;
128     @NotBlank
129     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
130
131     // All Kafka properties can be specified as an array of key-value pairs
132     private String[][] kafkaProperties = null;
133
134     // @formatter:on
135
136     /**
137      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
138      * service.
139      */
140     public KafkaCarrierTechnologyParameters() {
141         super();
142
143         // Set the carrier technology properties for the kafka carrier technology
144         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
145         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
146         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
147     }
148
149     /**
150      * Gets the kafka producer properties.
151      *
152      * @return the kafka producer properties
153      */
154     public Properties getKafkaProducerProperties() {
155         final Properties retKafkaProps = new Properties();
156
157         // Add properties from the Kafka property array
158         if (kafkaProperties != null) {
159             for (int i = 0; i < kafkaProperties.length; i++) {
160                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
161             }
162         }
163
164         // @formatter:off
165         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
166         putExplicitProperty(retKafkaProps, PROPERTY_ACKS,              acks,             DEFAULT_ACKS);
167         putExplicitProperty(retKafkaProps, PROPERTY_RETRIES,           retries,          DEFAULT_RETRIES);
168         putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE,        batchSize,        DEFAULT_BATCH_SIZE);
169         putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME,       lingerTime,       DEFAULT_LINGER_TIME);
170         putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY,     bufferMemory,     DEFAULT_BUFFER_MEMORY);
171         putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER,    keySerializer,    DEFAULT_STRING_SERZER);
172         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER,  valueSerializer,  DEFAULT_STRING_SERZER);
173         putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
174         // @formatter:on
175
176         return retKafkaProps;
177     }
178
179     /**
180      * Gets the kafka consumer properties.
181      *
182      * @return the kafka consumer properties
183      */
184     public Properties getKafkaConsumerProperties() {
185         final Properties retKafkaProps = new Properties();
186
187         // Add properties from the Kafka property array
188         if (kafkaProperties != null) {
189             for (int i = 0; i < kafkaProperties.length; i++) {
190                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
191             }
192         }
193
194         // @formatter:off
195         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS,  bootstrapServers, DEFAULT_BOOT_SERVERS);
196         putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID,           groupId,          DEFAULT_GROUP_ID);
197         putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
198         putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME,   autoCommitTime,   DEFAULT_AUTO_COMMIT_TIME);
199         putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT,    sessionTimeout,   DEFAULT_SESSION_TIMEOUT);
200         putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER,   keyDeserializer,  DEFAULT_STRING_DESZER);
201         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
202         // @formatter:on
203
204         return retKafkaProps;
205     }
206
207     /**
208      * Gets the consumer topic list.
209      *
210      * @return the consumer topic list
211      */
212     public Collection<String> getConsumerTopicListAsCollection() {
213         return Arrays.asList(consumerTopicList);
214     }
215
216     /**
217      * Gets the consumer poll duration.
218      * @return The poll duration
219      */
220     public Duration getConsumerPollDuration() {
221         return Duration.ofMillis(consumerPollTime);
222     }
223
224     /**
225      * {@inheritDoc}.
226      */
227     @Override
228     public GroupValidationResult validate() {
229         final GroupValidationResult result = super.validate();
230
231         validateConsumerTopicList(result);
232
233         validateKafkaProperties(result);
234
235         return result;
236     }
237
238     /**
239      * Validate the consumer topic list.
240      *
241      * @param result the result of the validation.
242      */
243     private void validateConsumerTopicList(final GroupValidationResult result) {
244         if (consumerTopicList == null || consumerTopicList.length == 0) {
245             result.setResult("consumerTopicList", ValidationStatus.INVALID,
246                     "not specified, must be specified as a list of strings");
247             return;
248         }
249
250         StringBuilder consumerTopicStringBuilder = new StringBuilder();
251         for (final String consumerTopic : consumerTopicList) {
252             if (StringUtils.isBlank(consumerTopic)) {
253                 consumerTopicStringBuilder.append(consumerTopic + "/");
254             }
255         }
256         if (consumerTopicStringBuilder.length() > 0) {
257             result.setResult("consumerTopicList", ValidationStatus.INVALID,
258                     "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
259         }
260     }
261
262     /**
263      * Validate the kafka properties.
264      *
265      * @param result the result of the validation.
266      */
267     private void validateKafkaProperties(final GroupValidationResult result) {
268         // Kafka properties are optional
269         if (kafkaProperties == null || kafkaProperties.length == 0) {
270             return;
271         }
272
273         for (int i = 0; i < kafkaProperties.length; i++) {
274             if (kafkaProperties[i].length != 2) {
275                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
276                         ENTRY + i + " invalid, kafka properties must be name-value pairs");
277             }
278
279             if (StringUtils.isBlank(kafkaProperties[i][0])) {
280                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
281                         ENTRY + i + " invalid, key is null or blank");
282             }
283
284             if (StringUtils.isBlank(kafkaProperties[i][1])) {
285                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
286                         ENTRY + i + " invalid, value is null or blank");
287             }
288         }
289     }
290
291     /**
292      * Put a property into the properties if it is not already defined and is not the default value.
293      *
294      * @param returnKafkaProperties the properties to set the value in
295      * @param property the property to put
296      * @param value the value of the property to put
297      * @param defaultValue the default value of the property to put
298      */
299     private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
300             final Object value, final Object defaultValue) {
301
302         // Check if the property is already in the properties
303         if (!returnKafkaProperties.containsKey(property)) {
304             // Not found, so add it
305             returnKafkaProperties.setProperty(property, value.toString());
306         } else {
307             // Found, only overwrite if the property does not have the default value
308             if (value == null) {
309                 returnKafkaProperties.setProperty(property, defaultValue.toString());
310             } else if (!value.toString().contentEquals(defaultValue.toString())) {
311                 returnKafkaProperties.setProperty(property, value.toString());
312             }
313         }
314     }
315 }