851741611b989601fb7e31b7752a5278f55d399e
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import java.time.Duration;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Properties;
27
28 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
29 import org.onap.policy.common.parameters.GroupValidationResult;
30 import org.onap.policy.common.parameters.ValidationStatus;
31
32 /**
33  * Apex parameters for Kafka as an event carrier technology.
34  *
35  * @author Liam Fallon (liam.fallon@ericsson.com)
36  */
37 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
38     // @formatter:off
39     /** The label of this carrier technology. */
40     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
41
42     /** The producer plugin class for the Kafka carrier technology. */
43     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
44
45     /** The consumer plugin class for the Kafka carrier technology. */
46     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
47
48     // Repeated strings in messages
49     private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
50
51     // Default parameter values
52     private static final String   DEFAULT_ACKS             = "all";
53     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
54     private static final int      DEFAULT_RETRIES          = 0;
55     private static final int      DEFAULT_BATCH_SIZE       = 16384;
56     private static final int      DEFAULT_LINGER_TIME      = 1;
57     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
58     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
59     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
60     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
61     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
62     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
63     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
64     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
65     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
66     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
67
68     // Parameter property map tokens
69     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
70     private static final String PROPERTY_ACKS               = "acks";
71     private static final String PROPERTY_RETRIES            = "retries";
72     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
73     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
74     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
75     private static final String PROPERTY_GROUP_ID           = "group.id";
76     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
77     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
78     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
79     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
80     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
81     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
82     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
83
84     // kafka carrier parameters
85     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
86     private String   acks              = DEFAULT_ACKS;
87     private int      retries           = DEFAULT_RETRIES;
88     private int      batchSize         = DEFAULT_BATCH_SIZE;
89     private int      lingerTime        = DEFAULT_LINGER_TIME;
90     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
91     private String   groupId           = DEFAULT_GROUP_ID;
92     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
93     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
94     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
95     private String   producerTopic     = DEFAULT_PROD_TOPIC;
96     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
97     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
98     private String   keySerializer     = DEFAULT_STRING_SERZER;
99     private String   valueSerializer   = DEFAULT_STRING_SERZER;
100     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
101     private String   valueDeserializer = DEFAULT_STRING_DESZER;
102     // @formatter:on
103
104     /**
105      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
106      * service.
107      */
108     public KafkaCarrierTechnologyParameters() {
109         super();
110
111         // Set the carrier technology properties for the kafka carrier technology
112         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
113         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
114         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
115     }
116
117     /**
118      * Gets the kafka producer properties.
119      *
120      * @return the kafka producer properties
121      */
122     public Properties getKafkaProducerProperties() {
123         final Properties kafkaProperties = new Properties();
124
125         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
126         kafkaProperties.put(PROPERTY_ACKS, acks);
127         kafkaProperties.put(PROPERTY_RETRIES, retries);
128         kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
129         kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
130         kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
131         kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
132         kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
133
134         return kafkaProperties;
135     }
136
137     /**
138      * Gets the kafka consumer properties.
139      *
140      * @return the kafka consumer properties
141      */
142     public Properties getKafkaConsumerProperties() {
143         final Properties kafkaProperties = new Properties();
144
145         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
146         kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
147         kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
148         kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
149         kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
150         kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
151         kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
152
153         return kafkaProperties;
154     }
155
156     /**
157      * Gets the bootstrap servers.
158      *
159      * @return the bootstrap servers
160      */
161     public String getBootstrapServers() {
162         return bootstrapServers;
163     }
164
165     /**
166      * Gets the acks.
167      *
168      * @return the acks
169      */
170     public String getAcks() {
171         return acks;
172     }
173
174     /**
175      * Gets the retries.
176      *
177      * @return the retries
178      */
179     public int getRetries() {
180         return retries;
181     }
182
183     /**
184      * Gets the batch size.
185      *
186      * @return the batch size
187      */
188     public int getBatchSize() {
189         return batchSize;
190     }
191
192     /**
193      * Gets the linger time.
194      *
195      * @return the linger time
196      */
197     public int getLingerTime() {
198         return lingerTime;
199     }
200
201     /**
202      * Gets the buffer memory.
203      *
204      * @return the buffer memory
205      */
206     public long getBufferMemory() {
207         return bufferMemory;
208     }
209
210     /**
211      * Gets the group id.
212      *
213      * @return the group id
214      */
215     public String getGroupId() {
216         return groupId;
217     }
218
219     /**
220      * Checks if is enable auto commit.
221      *
222      * @return true, if checks if is enable auto commit
223      */
224     public boolean isEnableAutoCommit() {
225         return enableAutoCommit;
226     }
227
228     /**
229      * Gets the auto commit time.
230      *
231      * @return the auto commit time
232      */
233     public int getAutoCommitTime() {
234         return autoCommitTime;
235     }
236
237     /**
238      * Gets the session timeout.
239      *
240      * @return the session timeout
241      */
242     public int getSessionTimeout() {
243         return sessionTimeout;
244     }
245
246     /**
247      * Gets the producer topic.
248      *
249      * @return the producer topic
250      */
251     public String getProducerTopic() {
252         return producerTopic;
253     }
254
255     /**
256      * Gets the consumer poll time.
257      *
258      * @return the consumer poll time
259      */
260     public long getConsumerPollTime() {
261         return consumerPollTime;
262     }
263
264     /**
265      * Gets the consumer poll duration.
266      * @return The poll duration
267      */
268     public Duration getConsumerPollDuration() {
269         return Duration.ofMillis(consumerPollTime);
270     }
271
272     /**
273      * Gets the consumer topic list.
274      *
275      * @return the consumer topic list
276      */
277     public Collection<String> getConsumerTopicList() {
278         return Arrays.asList(consumerTopicList);
279     }
280
281     /**
282      * Gets the key serializer.
283      *
284      * @return the key serializer
285      */
286     public String getKeySerializer() {
287         return keySerializer;
288     }
289
290     /**
291      * Gets the value serializer.
292      *
293      * @return the value serializer
294      */
295     public String getValueSerializer() {
296         return valueSerializer;
297     }
298
299     /**
300      * Gets the key deserializer.
301      *
302      * @return the key deserializer
303      */
304     public String getKeyDeserializer() {
305         return keyDeserializer;
306     }
307
308     /**
309      * Gets the value deserializer.
310      *
311      * @return the value deserializer
312      */
313     public String getValueDeserializer() {
314         return valueDeserializer;
315     }
316
317     /*
318      * (non-Javadoc)
319      *
320      * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
321      */
322     @Override
323     public GroupValidationResult validate() {
324         final GroupValidationResult result = super.validate();
325
326         validateStringParameters(result);
327
328         validateNumericParameters(result);
329
330         validateConsumerTopicList(result);
331
332         validateSerializersAndDeserializers(result);
333
334         return result;
335     }
336
337     /**
338      * Validate that string parameters are correct.
339      * 
340      * @param result the result of the validation
341      */
342     private void validateStringParameters(final GroupValidationResult result) {
343         if (isNullOrBlank(bootstrapServers)) {
344             result.setResult("bootstrapServers", ValidationStatus.INVALID,
345                             "not specified, must be specified as a string of form host:port");
346         }
347
348         if (isNullOrBlank(acks)) {
349             result.setResult("acks", ValidationStatus.INVALID,
350                             "not specified, must be specified as a string with values [0|1|all]");
351         }
352
353         if (isNullOrBlank(groupId)) {
354             result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
355         }
356
357         if (isNullOrBlank(producerTopic)) {
358             result.setResult("producerTopic", ValidationStatus.INVALID,
359                             SPECIFY_AS_STRING_MESSAGE);
360         }
361     }
362
363     /**
364      * Check if numeric parameters are valid.
365      * 
366      * @param result the result of the validation
367      */
368     private void validateNumericParameters(final GroupValidationResult result) {
369         if (retries < 0) {
370             result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
371                             "[" + retries + "] invalid, must be specified as retries >= 0");
372         }
373
374         if (batchSize < 0) {
375             result.setResult("batchSize", ValidationStatus.INVALID,
376                             "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
377         }
378
379         if (lingerTime < 0) {
380             result.setResult("lingerTime", ValidationStatus.INVALID,
381                             "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
382         }
383
384         if (bufferMemory < 0) {
385             result.setResult("bufferMemory", ValidationStatus.INVALID,
386                             "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
387         }
388
389         if (autoCommitTime < 0) {
390             result.setResult("autoCommitTime", ValidationStatus.INVALID,
391                             "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
392         }
393
394         if (sessionTimeout < 0) {
395             result.setResult("sessionTimeout", ValidationStatus.INVALID,
396                             "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
397         }
398
399         if (consumerPollTime < 0) {
400             result.setResult("consumerPollTime", ValidationStatus.INVALID,
401                             "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
402         }
403     }
404
405     /**
406      * Validate the serializers and deserializers.
407      * 
408      * @param result the result of the validation.
409      */
410     private void validateSerializersAndDeserializers(final GroupValidationResult result) {
411         if (isNullOrBlank(keySerializer)) {
412             result.setResult("keySerializer", ValidationStatus.INVALID,
413                             SPECIFY_AS_STRING_MESSAGE);
414         }
415
416         if (isNullOrBlank(valueSerializer)) {
417             result.setResult("valueSerializer", ValidationStatus.INVALID,
418                             SPECIFY_AS_STRING_MESSAGE);
419         }
420
421         if (isNullOrBlank(keyDeserializer)) {
422             result.setResult("keyDeserializer", ValidationStatus.INVALID,
423                             SPECIFY_AS_STRING_MESSAGE);
424         }
425
426         if (isNullOrBlank(valueDeserializer)) {
427             result.setResult("valueDeserializer", ValidationStatus.INVALID,
428                             SPECIFY_AS_STRING_MESSAGE);
429         }
430     }
431
432     private void validateConsumerTopicList(final GroupValidationResult result) {
433         if (consumerTopicList == null || consumerTopicList.length == 0) {
434             result.setResult("consumerTopicList", ValidationStatus.INVALID,
435                             "not specified, must be specified as a list of strings");
436         }
437
438         StringBuilder consumerTopicStringBuilder = new StringBuilder();
439         for (final String consumerTopic : consumerTopicList) {
440             if (consumerTopic == null || consumerTopic.trim().length() == 0) {
441                 consumerTopicStringBuilder.append(consumerTopic + "/");
442             }
443         }
444         if (consumerTopicStringBuilder.length() > 0) {
445             result.setResult("consumerTopicList", ValidationStatus.INVALID,
446                             "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
447         }
448     }
449
450     private boolean isNullOrBlank(final String stringValue) {
451         return stringValue == null || stringValue.trim().length() == 0;
452     }
453 }