475017283a299512f4e9c146535c0a67762295c3
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * SPDX-License-Identifier: Apache-2.0
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.apex.plugins.event.carrier.kafka;
25
26 import java.time.Duration;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.List;
30 import java.util.Properties;
31 import lombok.Getter;
32 import lombok.Setter;
33 import org.apache.commons.lang3.StringUtils;
34 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
35 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
36 import org.onap.policy.common.parameters.BeanValidationResult;
37 import org.onap.policy.common.parameters.ObjectValidationResult;
38 import org.onap.policy.common.parameters.ValidationResult;
39 import org.onap.policy.common.parameters.ValidationStatus;
40 import org.onap.policy.common.parameters.annotations.Min;
41 import org.onap.policy.common.parameters.annotations.NotBlank;
42 import org.onap.policy.models.base.Validated;
43
44 /**
45  * Apex parameters for Kafka as an event carrier technology.
46  *
47  * @author Liam Fallon (liam.fallon@ericsson.com)
48  */
49 @Getter
50 @Setter
51 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
52     // @formatter:off
53     /** The label of this carrier technology. */
54     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
55
56     /** The producer plugin class for the Kafka carrier technology. */
57     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getName();
58
59     /** The consumer plugin class for the Kafka carrier technology. */
60     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName();
61
62     // Repeated strings in messages
63     private static final String ENTRY = "entry ";
64     private static final String KAFKA_PROPERTIES = "kafkaProperties";
65
66     // Default parameter values
67     private static final String   DEFAULT_ACKS             = "all";
68     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
69     private static final int      DEFAULT_RETRIES          = 0;
70     private static final int      DEFAULT_BATCH_SIZE       = 16384;
71     private static final int      DEFAULT_LINGER_TIME      = 1;
72     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
73     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
74     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
75     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
76     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
77     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
78     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
79     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
80     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
81     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
82     private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getName();
83
84     // Parameter property map tokens
85     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
86     private static final String PROPERTY_ACKS               = "acks";
87     private static final String PROPERTY_RETRIES            = "retries";
88     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
89     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
90     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
91     private static final String PROPERTY_GROUP_ID           = "group.id";
92     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
93     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
94     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
95     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
96     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
97     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
98     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
99     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
100
101     // kafka carrier parameters
102     @NotBlank
103     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
104     @NotBlank
105     private String   acks              = DEFAULT_ACKS;
106     @Min(value = 0)
107     private int      retries           = DEFAULT_RETRIES;
108     @Min(value = 0)
109     private int      batchSize         = DEFAULT_BATCH_SIZE;
110     @Min(value = 0)
111     private int      lingerTime        = DEFAULT_LINGER_TIME;
112     @Min(value = 0)
113     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
114     @NotBlank
115     private String   groupId           = DEFAULT_GROUP_ID;
116     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
117     @Min(value = 0)
118     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
119     @Min(value = 0)
120     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
121     @NotBlank
122     private String   producerTopic     = DEFAULT_PROD_TOPIC;
123     @Min(value = 0)
124     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
125     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
126     @NotBlank
127     private String   keySerializer     = DEFAULT_STRING_SERZER;
128     @NotBlank
129     private String   valueSerializer   = DEFAULT_STRING_SERZER;
130     @NotBlank
131     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
132     @NotBlank
133     private String   valueDeserializer = DEFAULT_STRING_DESZER;
134     @NotBlank
135     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
136
137     // All Kafka properties can be specified as an array of key-value pairs
138     private String[][] kafkaProperties = null;
139
140     // @formatter:on
141
142     /**
143      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
144      * service.
145      */
146     public KafkaCarrierTechnologyParameters() {
147         super();
148
149         // Set the carrier technology properties for the kafka carrier technology
150         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
151         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
152         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
153     }
154
155     /**
156      * Gets the kafka producer properties.
157      *
158      * @return the kafka producer properties
159      */
160     public Properties getKafkaProducerProperties() {
161         final Properties retKafkaProps = new Properties();
162
163         // Add properties from the Kafka property array
164         if (kafkaProperties != null) {
165             for (int i = 0; i < kafkaProperties.length; i++) {
166                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
167             }
168         }
169
170         // @formatter:off
171         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
172         putExplicitProperty(retKafkaProps, PROPERTY_ACKS,              acks,             DEFAULT_ACKS);
173         putExplicitProperty(retKafkaProps, PROPERTY_RETRIES,           retries,          DEFAULT_RETRIES);
174         putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE,        batchSize,        DEFAULT_BATCH_SIZE);
175         putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME,       lingerTime,       DEFAULT_LINGER_TIME);
176         putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY,     bufferMemory,     DEFAULT_BUFFER_MEMORY);
177         putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER,    keySerializer,    DEFAULT_STRING_SERZER);
178         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER,  valueSerializer,  DEFAULT_STRING_SERZER);
179         putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
180         // @formatter:on
181
182         return retKafkaProps;
183     }
184
185     /**
186      * Gets the kafka consumer properties.
187      *
188      * @return the kafka consumer properties
189      */
190     public Properties getKafkaConsumerProperties() {
191         final Properties retKafkaProps = new Properties();
192
193         // Add properties from the Kafka property array
194         if (kafkaProperties != null) {
195             for (int i = 0; i < kafkaProperties.length; i++) {
196                 retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
197             }
198         }
199
200         // @formatter:off
201         putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS,  bootstrapServers, DEFAULT_BOOT_SERVERS);
202         putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID,           groupId,          DEFAULT_GROUP_ID);
203         putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
204         putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME,   autoCommitTime,   DEFAULT_AUTO_COMMIT_TIME);
205         putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT,    sessionTimeout,   DEFAULT_SESSION_TIMEOUT);
206         putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER,   keyDeserializer,  DEFAULT_STRING_DESZER);
207         putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
208         // @formatter:on
209
210         return retKafkaProps;
211     }
212
213     /**
214      * Gets the consumer topic list.
215      *
216      * @return the consumer topic list
217      */
218     public Collection<String> getConsumerTopicListAsCollection() {
219         return Arrays.asList(consumerTopicList);
220     }
221
222     /**
223      * Gets the consumer poll duration.
224      * @return The poll duration
225      */
226     public Duration getConsumerPollDuration() {
227         return Duration.ofMillis(consumerPollTime);
228     }
229
230     /**
231      * {@inheritDoc}.
232      */
233     @Override
234     public BeanValidationResult validate() {
235         final BeanValidationResult result = super.validate();
236
237         result.addResult(validateConsumerTopicList());
238
239         result.addResult(validateKafkaProperties());
240
241         return result;
242     }
243
244     /**
245      * Validate the consumer topic list.
246      */
247     private ValidationResult validateConsumerTopicList() {
248         if (consumerTopicList == null || consumerTopicList.length == 0) {
249             return new ObjectValidationResult("consumerTopicList", consumerTopicList, ValidationStatus.INVALID,
250                     "not specified, must be specified as a list of strings");
251         }
252
253         BeanValidationResult result = new BeanValidationResult("consumerTopicList", consumerTopicList);
254         int item = 0;
255         for (final String consumerTopic : consumerTopicList) {
256             if (StringUtils.isBlank(consumerTopic)) {
257                 result.addResult(ENTRY + item, consumerTopic, ValidationStatus.INVALID, Validated.IS_BLANK);
258             }
259
260             ++item;
261         }
262
263         return result;
264     }
265
266     /**
267      * Validate the kafka properties.
268      */
269     private ValidationResult validateKafkaProperties() {
270         // Kafka properties are optional
271         if (kafkaProperties == null || kafkaProperties.length == 0) {
272             return null;
273         }
274
275         BeanValidationResult result = new BeanValidationResult(KAFKA_PROPERTIES, kafkaProperties);
276
277         for (int i = 0; i < kafkaProperties.length; i++) {
278             final String label = ENTRY + i;
279             final String[] kafkaProperty = kafkaProperties[i];
280             final List<String> value = (kafkaProperty == null ? null : Arrays.asList(kafkaProperty));
281             final BeanValidationResult result2 = new BeanValidationResult(label, value);
282
283             if (kafkaProperty == null) {
284                 // note: add to result, not result2
285                 result.addResult(label, value, ValidationStatus.INVALID, Validated.IS_NULL);
286
287             } else if (kafkaProperty.length != 2) {
288                 // note: add to result, not result2
289                 result.addResult(label, Arrays.asList(kafkaProperty), ValidationStatus.INVALID,
290                         "kafka properties must be name-value pairs");
291
292             } else if (StringUtils.isBlank(kafkaProperty[0])) {
293                 result2.addResult("key", kafkaProperty[0], ValidationStatus.INVALID, Validated.IS_BLANK);
294
295             } else if (null == kafkaProperty[1]) {
296                 // the value of a property has to be specified as empty in some cases, but should never be null.
297                 result2.addResult("value", kafkaProperty[1], ValidationStatus.INVALID, Validated.IS_NULL);
298             }
299
300             result.addResult(result2);
301         }
302
303         return result;
304     }
305
306     /**
307      * Put a property into the properties if it is not already defined and is not the default value.
308      *
309      * @param returnKafkaProperties the properties to set the value in
310      * @param property the property to put
311      * @param value the value of the property to put
312      * @param defaultValue the default value of the property to put
313      */
314     private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
315             final Object value, final Object defaultValue) {
316
317         // Check if the property is already in the properties
318         if (!returnKafkaProperties.containsKey(property)) {
319             // Not found, so add it
320             returnKafkaProperties.setProperty(property, value.toString());
321         } else {
322             // Found, only overwrite if the property does not have the default value
323             if (value == null) {
324                 returnKafkaProperties.setProperty(property, defaultValue.toString());
325             } else if (!value.toString().contentEquals(defaultValue.toString())) {
326                 returnKafkaProperties.setProperty(property, value.toString());
327             }
328         }
329     }
330 }