eb1f15c936d0abc3527245b480bb3ec114f585f4
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.plugins.event.carrier.kafka;
24
25 import java.time.Duration;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Properties;
29 import lombok.Getter;
30 import lombok.Setter;
31 import org.apache.commons.lang3.StringUtils;
32 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
33 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
34 import org.onap.policy.common.parameters.GroupValidationResult;
35 import org.onap.policy.common.parameters.ValidationStatus;
36 import org.onap.policy.common.parameters.annotations.Min;
37 import org.onap.policy.common.parameters.annotations.NotBlank;
38
39 /**
40  * Apex parameters for Kafka as an event carrier technology.
41  *
42  * @author Liam Fallon (liam.fallon@ericsson.com)
43  */
44 @Getter
45 @Setter
46 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
47     // @formatter:off
48     /** The label of this carrier technology. */
49     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
50
51     /** The producer plugin class for the Kafka carrier technology. */
52     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getName();
53
54     /** The consumer plugin class for the Kafka carrier technology. */
55     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName();
56
57     // Repeated strings in messages
58     private static final String ENTRY = "entry ";
59     private static final String KAFKA_PROPERTIES = "kafkaProperties";
60
61     // Default parameter values
62     private static final String   DEFAULT_ACKS             = "all";
63     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
64     private static final int      DEFAULT_RETRIES          = 0;
65     private static final int      DEFAULT_BATCH_SIZE       = 16384;
66     private static final int      DEFAULT_LINGER_TIME      = 1;
67     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
68     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
69     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
70     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
71     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
72     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
73     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
74     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
75     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
76     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
77     private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getName();
78
79     // Parameter property map tokens
80     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
81     private static final String PROPERTY_ACKS               = "acks";
82     private static final String PROPERTY_RETRIES            = "retries";
83     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
84     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
85     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
86     private static final String PROPERTY_GROUP_ID           = "group.id";
87     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
88     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
89     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
90     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
91     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
92     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
93     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
94     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
95
96     // kafka carrier parameters
97     @NotBlank
98     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
99     @NotBlank
100     private String   acks              = DEFAULT_ACKS;
101     @Min(value = 0)
102     private int      retries           = DEFAULT_RETRIES;
103     @Min(value = 0)
104     private int      batchSize         = DEFAULT_BATCH_SIZE;
105     @Min(value = 0)
106     private int      lingerTime        = DEFAULT_LINGER_TIME;
107     @Min(value = 0)
108     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
109     @NotBlank
110     private String   groupId           = DEFAULT_GROUP_ID;
111     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
112     @Min(value = 0)
113     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
114     @Min(value = 0)
115     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
116     @NotBlank
117     private String   producerTopic     = DEFAULT_PROD_TOPIC;
118     @Min(value = 0)
119     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
120     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
121     @NotBlank
122     private String   keySerializer     = DEFAULT_STRING_SERZER;
123     @NotBlank
124     private String   valueSerializer   = DEFAULT_STRING_SERZER;
125     @NotBlank
126     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
127     @NotBlank
128     private String   valueDeserializer = DEFAULT_STRING_DESZER;
129     @NotBlank
130     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
131
132     // All Kafka properties can be specified as an array of key-value pairs
133     private String[][] kafkaProperties = null;
134
135     // @formatter:on
136
137     /**
138      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
139      * service.
140      */
141     public KafkaCarrierTechnologyParameters() {
142         super();
143
144         // Set the carrier technology properties for the kafka carrier technology
145         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
146         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
147         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
148     }
149
150     /**
151      * Gets the kafka producer properties.
152      *
153      * @return the kafka producer properties
154      */
155     public Properties getKafkaProducerProperties() {
156         final Properties retKafkaProps = new Properties();
157
158         // Add properties from the Kafka property array
159         if (kafkaProperties != null) {
160             for (int i = 0; i < kafkaProperties.length; i++) {
161                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
162             }
163         }
164
165         // @formatter:off
166         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
167         putExplicitProperty(retKafkaProps, PROPERTY_ACKS,              acks,             DEFAULT_ACKS);
168         putExplicitProperty(retKafkaProps, PROPERTY_RETRIES,           retries,          DEFAULT_RETRIES);
169         putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE,        batchSize,        DEFAULT_BATCH_SIZE);
170         putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME,       lingerTime,       DEFAULT_LINGER_TIME);
171         putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY,     bufferMemory,     DEFAULT_BUFFER_MEMORY);
172         putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER,    keySerializer,    DEFAULT_STRING_SERZER);
173         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER,  valueSerializer,  DEFAULT_STRING_SERZER);
174         putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
175         // @formatter:on
176
177         return retKafkaProps;
178     }
179
180     /**
181      * Gets the kafka consumer properties.
182      *
183      * @return the kafka consumer properties
184      */
185     public Properties getKafkaConsumerProperties() {
186         final Properties retKafkaProps = new Properties();
187
188         // Add properties from the Kafka property array
189         if (kafkaProperties != null) {
190             for (int i = 0; i < kafkaProperties.length; i++) {
191                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
192             }
193         }
194
195         // @formatter:off
196         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS,  bootstrapServers, DEFAULT_BOOT_SERVERS);
197         putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID,           groupId,          DEFAULT_GROUP_ID);
198         putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
199         putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME,   autoCommitTime,   DEFAULT_AUTO_COMMIT_TIME);
200         putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT,    sessionTimeout,   DEFAULT_SESSION_TIMEOUT);
201         putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER,   keyDeserializer,  DEFAULT_STRING_DESZER);
202         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
203         // @formatter:on
204
205         return retKafkaProps;
206     }
207
208     /**
209      * Gets the consumer topic list.
210      *
211      * @return the consumer topic list
212      */
213     public Collection<String> getConsumerTopicListAsCollection() {
214         return Arrays.asList(consumerTopicList);
215     }
216
217     /**
218      * Gets the consumer poll duration.
219      * @return The poll duration
220      */
221     public Duration getConsumerPollDuration() {
222         return Duration.ofMillis(consumerPollTime);
223     }
224
225     /**
226      * {@inheritDoc}.
227      */
228     @Override
229     public GroupValidationResult validate() {
230         final GroupValidationResult result = super.validate();
231
232         validateConsumerTopicList(result);
233
234         validateKafkaProperties(result);
235
236         return result;
237     }
238
239     /**
240      * Validate the consumer topic list.
241      *
242      * @param result the result of the validation.
243      */
244     private void validateConsumerTopicList(final GroupValidationResult result) {
245         if (consumerTopicList == null || consumerTopicList.length == 0) {
246             result.setResult("consumerTopicList", ValidationStatus.INVALID,
247                     "not specified, must be specified as a list of strings");
248             return;
249         }
250
251         StringBuilder consumerTopicStringBuilder = new StringBuilder();
252         for (final String consumerTopic : consumerTopicList) {
253             if (StringUtils.isBlank(consumerTopic)) {
254                 consumerTopicStringBuilder.append(consumerTopic + "/");
255             }
256         }
257         if (consumerTopicStringBuilder.length() > 0) {
258             result.setResult("consumerTopicList", ValidationStatus.INVALID,
259                     "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
260         }
261     }
262
263     /**
264      * Validate the kafka properties.
265      *
266      * @param result the result of the validation.
267      */
268     private void validateKafkaProperties(final GroupValidationResult result) {
269         // Kafka properties are optional
270         if (kafkaProperties == null || kafkaProperties.length == 0) {
271             return;
272         }
273
274         for (int i = 0; i < kafkaProperties.length; i++) {
275             if (kafkaProperties[i].length != 2) {
276                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
277                         ENTRY + i + " invalid, kafka properties must be name-value pairs");
278             }
279
280             if (StringUtils.isBlank(kafkaProperties[i][0])) {
281                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
282                         ENTRY + i + " invalid, key is null or blank");
283             }
284
285             // the value of a property has to be specified as empty in some cases, but should never be null.
286             if (null == kafkaProperties[i][1]) {
287                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
288                         ENTRY + i + " invalid, value is null");
289             }
290         }
291     }
292
293     /**
294      * Put a property into the properties if it is not already defined and is not the default value.
295      *
296      * @param returnKafkaProperties the properties to set the value in
297      * @param property the property to put
298      * @param value the value of the property to put
299      * @param defaultValue the default value of the property to put
300      */
301     private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
302             final Object value, final Object defaultValue) {
303
304         // Check if the property is already in the properties
305         if (!returnKafkaProperties.containsKey(property)) {
306             // Not found, so add it
307             returnKafkaProperties.setProperty(property, value.toString());
308         } else {
309             // Found, only overwrite if the property does not have the default value
310             if (value == null) {
311                 returnKafkaProperties.setProperty(property, defaultValue.toString());
312             } else if (!value.toString().contentEquals(defaultValue.toString())) {
313                 returnKafkaProperties.setProperty(property, value.toString());
314             }
315         }
316     }
317 }