6aa9d53e688cb9838a3399c1ff80686d7d6a07a8
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import java.time.Duration;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Properties;
27
28 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
29 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
30 import org.onap.policy.common.parameters.GroupValidationResult;
31 import org.onap.policy.common.parameters.ValidationStatus;
32
33 /**
34  * Apex parameters for Kafka as an event carrier technology.
35  *
36  * @author Liam Fallon (liam.fallon@ericsson.com)
37  */
38 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
39     // @formatter:off
40     /** The label of this carrier technology. */
41     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
42
43     /** The producer plugin class for the Kafka carrier technology. */
44     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
45
46     /** The consumer plugin class for the Kafka carrier technology. */
47     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
48
49     // Repeated strings in messages
50     private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
51
52     // Default parameter values
53     private static final String   DEFAULT_ACKS             = "all";
54     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
55     private static final int      DEFAULT_RETRIES          = 0;
56     private static final int      DEFAULT_BATCH_SIZE       = 16384;
57     private static final int      DEFAULT_LINGER_TIME      = 1;
58     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
59     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
60     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
61     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
62     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
63     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
64     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
65     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
66     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
67     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
68     private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName();
69
70     // Parameter property map tokens
71     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
72     private static final String PROPERTY_ACKS               = "acks";
73     private static final String PROPERTY_RETRIES            = "retries";
74     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
75     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
76     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
77     private static final String PROPERTY_GROUP_ID           = "group.id";
78     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
79     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
80     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
81     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
82     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
83     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
84     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
85     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
86
87     // kafka carrier parameters
88     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
89     private String   acks              = DEFAULT_ACKS;
90     private int      retries           = DEFAULT_RETRIES;
91     private int      batchSize         = DEFAULT_BATCH_SIZE;
92     private int      lingerTime        = DEFAULT_LINGER_TIME;
93     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
94     private String   groupId           = DEFAULT_GROUP_ID;
95     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
96     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
97     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
98     private String   producerTopic     = DEFAULT_PROD_TOPIC;
99     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
100     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
101     private String   keySerializer     = DEFAULT_STRING_SERZER;
102     private String   valueSerializer   = DEFAULT_STRING_SERZER;
103     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
104     private String   valueDeserializer = DEFAULT_STRING_DESZER;
105     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
106     // @formatter:on
107
108     /**
109      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
110      * service.
111      */
112     public KafkaCarrierTechnologyParameters() {
113         super();
114
115         // Set the carrier technology properties for the kafka carrier technology
116         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
117         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
118         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
119     }
120
121     /**
122      * Gets the kafka producer properties.
123      *
124      * @return the kafka producer properties
125      */
126     public Properties getKafkaProducerProperties() {
127         final Properties kafkaProperties = new Properties();
128
129         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
130         kafkaProperties.put(PROPERTY_ACKS, acks);
131         kafkaProperties.put(PROPERTY_RETRIES, retries);
132         kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
133         kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
134         kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
135         kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
136         kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
137         kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
138
139         return kafkaProperties;
140     }
141
142     /**
143      * Gets the kafka consumer properties.
144      *
145      * @return the kafka consumer properties
146      */
147     public Properties getKafkaConsumerProperties() {
148         final Properties kafkaProperties = new Properties();
149
150         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
151         kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
152         kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
153         kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
154         kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
155         kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
156         kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
157
158         return kafkaProperties;
159     }
160
161     /**
162      * Gets the bootstrap servers.
163      *
164      * @return the bootstrap servers
165      */
166     public String getBootstrapServers() {
167         return bootstrapServers;
168     }
169
170     /**
171      * Gets the acks.
172      *
173      * @return the acks
174      */
175     public String getAcks() {
176         return acks;
177     }
178
179     /**
180      * Gets the retries.
181      *
182      * @return the retries
183      */
184     public int getRetries() {
185         return retries;
186     }
187
188     /**
189      * Gets the batch size.
190      *
191      * @return the batch size
192      */
193     public int getBatchSize() {
194         return batchSize;
195     }
196
197     /**
198      * Gets the linger time.
199      *
200      * @return the linger time
201      */
202     public int getLingerTime() {
203         return lingerTime;
204     }
205
206     /**
207      * Gets the buffer memory.
208      *
209      * @return the buffer memory
210      */
211     public long getBufferMemory() {
212         return bufferMemory;
213     }
214
215     /**
216      * Gets the group id.
217      *
218      * @return the group id
219      */
220     public String getGroupId() {
221         return groupId;
222     }
223
224     /**
225      * Checks if is enable auto commit.
226      *
227      * @return true, if checks if is enable auto commit
228      */
229     public boolean isEnableAutoCommit() {
230         return enableAutoCommit;
231     }
232
233     /**
234      * Gets the auto commit time.
235      *
236      * @return the auto commit time
237      */
238     public int getAutoCommitTime() {
239         return autoCommitTime;
240     }
241
242     /**
243      * Gets the session timeout.
244      *
245      * @return the session timeout
246      */
247     public int getSessionTimeout() {
248         return sessionTimeout;
249     }
250
251     /**
252      * Gets the producer topic.
253      *
254      * @return the producer topic
255      */
256     public String getProducerTopic() {
257         return producerTopic;
258     }
259
260     /**
261      * Gets the consumer poll time.
262      *
263      * @return the consumer poll time
264      */
265     public long getConsumerPollTime() {
266         return consumerPollTime;
267     }
268
269     /**
270      * Gets the consumer poll duration.
271      * @return The poll duration
272      */
273     public Duration getConsumerPollDuration() {
274         return Duration.ofMillis(consumerPollTime);
275     }
276
277     /**
278      * Gets the consumer topic list.
279      *
280      * @return the consumer topic list
281      */
282     public Collection<String> getConsumerTopicList() {
283         return Arrays.asList(consumerTopicList);
284     }
285
286     /**
287      * Gets the key serializer.
288      *
289      * @return the key serializer
290      */
291     public String getKeySerializer() {
292         return keySerializer;
293     }
294
295     /**
296      * Gets the value serializer.
297      *
298      * @return the value serializer
299      */
300     public String getValueSerializer() {
301         return valueSerializer;
302     }
303
304     /**
305      * Gets the key deserializer.
306      *
307      * @return the key deserializer
308      */
309     public String getKeyDeserializer() {
310         return keyDeserializer;
311     }
312
313     /**
314      * Gets the value deserializer.
315      *
316      * @return the value deserializer
317      */
318     public String getValueDeserializer() {
319         return valueDeserializer;
320     }
321
322     /**
323      * Gets the value deserializer.
324      *
325      * @return the value deserializer
326      */
327     public String getPartitionerClass() {
328         return partitionerClass;
329     }
330
331     /*
332      * (non-Javadoc)
333      *
334      * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
335      */
336     @Override
337     public GroupValidationResult validate() {
338         final GroupValidationResult result = super.validate();
339
340         validateStringParameters(result);
341
342         validateNumericParameters(result);
343
344         validateConsumerTopicList(result);
345
346         validateSerializersAndDeserializers(result);
347
348         return result;
349     }
350
351     /**
352      * Validate that string parameters are correct.
353      *
354      * @param result the result of the validation
355      */
356     private void validateStringParameters(final GroupValidationResult result) {
357         if (isNullOrBlank(bootstrapServers)) {
358             result.setResult("bootstrapServers", ValidationStatus.INVALID,
359                             "not specified, must be specified as a string of form host:port");
360         }
361
362         if (isNullOrBlank(acks)) {
363             result.setResult("acks", ValidationStatus.INVALID,
364                             "not specified, must be specified as a string with values [0|1|all]");
365         }
366
367         if (isNullOrBlank(groupId)) {
368             result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
369         }
370
371         if (isNullOrBlank(producerTopic)) {
372             result.setResult("producerTopic", ValidationStatus.INVALID,
373                             SPECIFY_AS_STRING_MESSAGE);
374         }
375
376         if (isNullOrBlank(partitionerClass)) {
377             result.setResult("partitionerClass", ValidationStatus.INVALID,
378                             SPECIFY_AS_STRING_MESSAGE);
379         }
380     }
381
382     /**
383      * Check if numeric parameters are valid.
384      *
385      * @param result the result of the validation
386      */
387     private void validateNumericParameters(final GroupValidationResult result) {
388         if (retries < 0) {
389             result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
390                             "[" + retries + "] invalid, must be specified as retries >= 0");
391         }
392
393         if (batchSize < 0) {
394             result.setResult("batchSize", ValidationStatus.INVALID,
395                             "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
396         }
397
398         if (lingerTime < 0) {
399             result.setResult("lingerTime", ValidationStatus.INVALID,
400                             "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
401         }
402
403         if (bufferMemory < 0) {
404             result.setResult("bufferMemory", ValidationStatus.INVALID,
405                             "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
406         }
407
408         if (autoCommitTime < 0) {
409             result.setResult("autoCommitTime", ValidationStatus.INVALID,
410                             "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
411         }
412
413         if (sessionTimeout < 0) {
414             result.setResult("sessionTimeout", ValidationStatus.INVALID,
415                             "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
416         }
417
418         if (consumerPollTime < 0) {
419             result.setResult("consumerPollTime", ValidationStatus.INVALID,
420                             "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
421         }
422     }
423
424     /**
425      * Validate the serializers and deserializers.
426      *
427      * @param result the result of the validation.
428      */
429     private void validateSerializersAndDeserializers(final GroupValidationResult result) {
430         if (isNullOrBlank(keySerializer)) {
431             result.setResult("keySerializer", ValidationStatus.INVALID,
432                             SPECIFY_AS_STRING_MESSAGE);
433         }
434
435         if (isNullOrBlank(valueSerializer)) {
436             result.setResult("valueSerializer", ValidationStatus.INVALID,
437                             SPECIFY_AS_STRING_MESSAGE);
438         }
439
440         if (isNullOrBlank(keyDeserializer)) {
441             result.setResult("keyDeserializer", ValidationStatus.INVALID,
442                             SPECIFY_AS_STRING_MESSAGE);
443         }
444
445         if (isNullOrBlank(valueDeserializer)) {
446             result.setResult("valueDeserializer", ValidationStatus.INVALID,
447                             SPECIFY_AS_STRING_MESSAGE);
448         }
449     }
450
451     private void validateConsumerTopicList(final GroupValidationResult result) {
452         if (consumerTopicList == null || consumerTopicList.length == 0) {
453             result.setResult("consumerTopicList", ValidationStatus.INVALID,
454                             "not specified, must be specified as a list of strings");
455         }
456
457         StringBuilder consumerTopicStringBuilder = new StringBuilder();
458         for (final String consumerTopic : consumerTopicList) {
459             if (consumerTopic == null || consumerTopic.trim().length() == 0) {
460                 consumerTopicStringBuilder.append(consumerTopic + "/");
461             }
462         }
463         if (consumerTopicStringBuilder.length() > 0) {
464             result.setResult("consumerTopicList", ValidationStatus.INVALID,
465                             "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
466         }
467     }
468
469     private boolean isNullOrBlank(final String stringValue) {
470         return stringValue == null || stringValue.trim().length() == 0;
471     }
472 }