927d79ee185d6146684c53f2d9e4fdbc8ccfec0e
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.kafka;
23
24 import java.time.Duration;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Properties;
28
29 import lombok.Getter;
30 import lombok.Setter;
31
32 import org.apache.commons.lang3.StringUtils;
33 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
34 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
35 import org.onap.policy.common.parameters.GroupValidationResult;
36 import org.onap.policy.common.parameters.ValidationStatus;
37 import org.onap.policy.common.parameters.annotations.Min;
38 import org.onap.policy.common.parameters.annotations.NotBlank;
39
40 /**
41  * Apex parameters for Kafka as an event carrier technology.
42  *
43  * @author Liam Fallon (liam.fallon@ericsson.com)
44  */
45 @Getter
46 @Setter
47 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
48     // @formatter:off
49     /** The label of this carrier technology. */
50     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
51
52     /** The producer plugin class for the Kafka carrier technology. */
53     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getName();
54
55     /** The consumer plugin class for the Kafka carrier technology. */
56     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName();
57
58     // Repeated strings in messages
59     private static final String ENTRY = "entry ";
60     private static final String KAFKA_PROPERTIES = "kafkaProperties";
61
62     // Default parameter values
63     private static final String   DEFAULT_ACKS             = "all";
64     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
65     private static final int      DEFAULT_RETRIES          = 0;
66     private static final int      DEFAULT_BATCH_SIZE       = 16384;
67     private static final int      DEFAULT_LINGER_TIME      = 1;
68     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
69     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
70     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
71     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
72     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
73     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
74     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
75     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
76     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
77     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
78     private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getName();
79
80     // Parameter property map tokens
81     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
82     private static final String PROPERTY_ACKS               = "acks";
83     private static final String PROPERTY_RETRIES            = "retries";
84     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
85     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
86     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
87     private static final String PROPERTY_GROUP_ID           = "group.id";
88     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
89     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
90     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
91     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
92     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
93     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
94     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
95     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
96
97     // kafka carrier parameters
98     @NotBlank
99     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
100     @NotBlank
101     private String   acks              = DEFAULT_ACKS;
102     @Min(value = 0)
103     private int      retries           = DEFAULT_RETRIES;
104     @Min(value = 0)
105     private int      batchSize         = DEFAULT_BATCH_SIZE;
106     @Min(value = 0)
107     private int      lingerTime        = DEFAULT_LINGER_TIME;
108     @Min(value = 0)
109     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
110     @NotBlank
111     private String   groupId           = DEFAULT_GROUP_ID;
112     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
113     @Min(value = 0)
114     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
115     @Min(value = 0)
116     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
117     @NotBlank
118     private String   producerTopic     = DEFAULT_PROD_TOPIC;
119     @Min(value = 0)
120     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
121     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
122     @NotBlank
123     private String   keySerializer     = DEFAULT_STRING_SERZER;
124     @NotBlank
125     private String   valueSerializer   = DEFAULT_STRING_SERZER;
126     @NotBlank
127     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
128     @NotBlank
129     private String   valueDeserializer = DEFAULT_STRING_DESZER;
130     @NotBlank
131     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
132
133     // All Kafka properties can be specified as an array of key-value pairs
134     private String[][] kafkaProperties = null;
135
136     // @formatter:on
137
138     /**
139      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
140      * service.
141      */
142     public KafkaCarrierTechnologyParameters() {
143         super();
144
145         // Set the carrier technology properties for the kafka carrier technology
146         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
147         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
148         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
149     }
150
151     /**
152      * Gets the kafka producer properties.
153      *
154      * @return the kafka producer properties
155      */
156     public Properties getKafkaProducerProperties() {
157         final Properties returnKafkaProperties = new Properties();
158
159         // Add properties from the Kafka property array
160         if (kafkaProperties != null) {
161             for (int i = 0; i < kafkaProperties.length; i++) {
162                 returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
163             }
164         }
165
166         returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
167         returnKafkaProperties.put(PROPERTY_ACKS, acks);
168         returnKafkaProperties.put(PROPERTY_RETRIES, retries);
169         returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
170         returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
171         returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
172         returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
173         returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
174         returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
175
176         return returnKafkaProperties;
177     }
178
179     /**
180      * Gets the kafka consumer properties.
181      *
182      * @return the kafka consumer properties
183      */
184     public Properties getKafkaConsumerProperties() {
185         final Properties returnKafkaProperties = new Properties();
186
187         // Add properties from the Kafka property array
188         if (kafkaProperties != null) {
189             for (int i = 0; i < kafkaProperties.length; i++) {
190                 returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
191             }
192         }
193
194         returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
195         returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
196         returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
197         returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
198         returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
199         returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
200         returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
201
202         return returnKafkaProperties;
203     }
204
205     /**
206      * Gets the consumer topic list.
207      *
208      * @return the consumer topic list
209      */
210     public Collection<String> getConsumerTopicListAsCollection() {
211         return Arrays.asList(consumerTopicList);
212     }
213
214     /**
215      * Gets the consumer poll duration.
216      * @return The poll duration
217      */
218     public Duration getConsumerPollDuration() {
219         return Duration.ofMillis(consumerPollTime);
220     }
221
222     /**
223      * {@inheritDoc}.
224      */
225     @Override
226     public GroupValidationResult validate() {
227         final GroupValidationResult result = super.validate();
228
229         validateConsumerTopicList(result);
230
231         validateKafkaProperties(result);
232
233         return result;
234     }
235
236     /**
237      * Validate the consumer topic list.
238      *
239      * @param result the result of the validation.
240      */
241     private void validateConsumerTopicList(final GroupValidationResult result) {
242         if (consumerTopicList == null || consumerTopicList.length == 0) {
243             result.setResult("consumerTopicList", ValidationStatus.INVALID,
244                     "not specified, must be specified as a list of strings");
245             return;
246         }
247
248         StringBuilder consumerTopicStringBuilder = new StringBuilder();
249         for (final String consumerTopic : consumerTopicList) {
250             if (StringUtils.isBlank(consumerTopic)) {
251                 consumerTopicStringBuilder.append(consumerTopic + "/");
252             }
253         }
254         if (consumerTopicStringBuilder.length() > 0) {
255             result.setResult("consumerTopicList", ValidationStatus.INVALID,
256                     "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
257         }
258     }
259
260     /**
261      * Validate the kafka properties.
262      *
263      * @param result the result of the validation.
264      */
265     private void validateKafkaProperties(final GroupValidationResult result) {
266         // Kafka properties are optional
267         if (kafkaProperties == null || kafkaProperties.length == 0) {
268             return;
269         }
270
271         for (int i = 0; i < kafkaProperties.length; i++) {
272             if (kafkaProperties[i].length != 2) {
273                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
274                         ENTRY + i + " invalid, kafka properties must be name-value pairs");
275             }
276
277             if (StringUtils.isBlank(kafkaProperties[i][0])) {
278                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
279                         ENTRY + i + " invalid, key is null or blank");
280             }
281
282             if (StringUtils.isBlank(kafkaProperties[i][1])) {
283                 result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
284                         ENTRY + i + " invalid, value is null or blank");
285             }
286         }
287     }
288 }