7c24ce1aa9deec32682f51cfb1808935d645ccf9
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import java.time.Duration;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Properties;
27
28 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
29 import org.onap.policy.common.parameters.GroupValidationResult;
30 import org.onap.policy.common.parameters.ValidationStatus;
31
32 /**
33  * Apex parameters for Kafka as an event carrier technology.
34  *
35  * @author Liam Fallon (liam.fallon@ericsson.com)
36  */
37 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
38     // @formatter:off
39     /** The label of this carrier technology. */
40     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
41
42     /** The producer plugin class for the Kafka carrier technology. */
43     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
44
45     /** The consumer plugin class for the Kafka carrier technology. */
46     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
47
48     // Repeated strings in messages
49     private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
50
51     // Default parameter values
52     private static final String   DEFAULT_ACKS             = "all";
53     private static final String   DEFAULT_BOOT_SERVERS     = "localhost:9092";
54     private static final int      DEFAULT_RETRIES          = 0;
55     private static final int      DEFAULT_BATCH_SIZE       = 16384;
56     private static final int      DEFAULT_LINGER_TIME      = 1;
57     private static final long     DEFAULT_BUFFER_MEMORY    = 33554432;
58     private static final String   DEFAULT_GROUP_ID         = "default-group-id";
59     private static final boolean  DEFAULT_ENABLE_AUTOCMIT  = true;
60     private static final int      DEFAULT_AUTO_COMMIT_TIME = 1000;
61     private static final int      DEFAULT_SESSION_TIMEOUT  = 30000;
62     private static final String   DEFAULT_PROD_TOPIC       = "apex-out";
63     private static final int      DEFAULT_CONS_POLL_TIME   = 100;
64     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
65     private static final String   DEFAULT_KEY_SERZER       = "org.apache.kafka.common.serialization.StringSerializer";
66     private static final String   DEFAULT_VAL_SERZER       = "org.apache.kafka.common.serialization.StringSerializer";
67     private static final String   DEFAULT_KEY_DESZER       = "org.apache.kafka.common.serialization.StringDeserializer";
68     private static final String   DEFAULT_VALUE_DESZER     = "org.apache.kafka.common.serialization.StringDeserializer";
69
70     // Parameter property map tokens
71     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
72     private static final String PROPERTY_ACKS               = "acks";
73     private static final String PROPERTY_RETRIES            = "retries";
74     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
75     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
76     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
77     private static final String PROPERTY_GROUP_ID           = "group.id";
78     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
79     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
80     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
81     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
82     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
83     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
84     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
85
86     // kafka carrier parameters
87     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
88     private String   acks              = DEFAULT_ACKS;
89     private int      retries           = DEFAULT_RETRIES;
90     private int      batchSize         = DEFAULT_BATCH_SIZE;
91     private int      lingerTime        = DEFAULT_LINGER_TIME;
92     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
93     private String   groupId           = DEFAULT_GROUP_ID;
94     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
95     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
96     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
97     private String   producerTopic     = DEFAULT_PROD_TOPIC;
98     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
99     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
100     private String   keySerializer     = DEFAULT_KEY_SERZER;
101     private String   valueSerializer   = DEFAULT_VAL_SERZER;
102     private String   keyDeserializer   = DEFAULT_KEY_DESZER;
103     private String   valueDeserializer = DEFAULT_VALUE_DESZER;
104     // @formatter:on
105
106     /**
107      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
108      * service.
109      */
110     public KafkaCarrierTechnologyParameters() {
111         super();
112
113         // Set the carrier technology properties for the kafka carrier technology
114         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
115         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
116         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
117     }
118
119     /**
120      * Gets the kafka producer properties.
121      *
122      * @return the kafka producer properties
123      */
124     public Properties getKafkaProducerProperties() {
125         final Properties kafkaProperties = new Properties();
126
127         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
128         kafkaProperties.put(PROPERTY_ACKS, acks);
129         kafkaProperties.put(PROPERTY_RETRIES, retries);
130         kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
131         kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
132         kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
133         kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
134         kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
135
136         return kafkaProperties;
137     }
138
139     /**
140      * Gets the kafka consumer properties.
141      *
142      * @return the kafka consumer properties
143      */
144     public Properties getKafkaConsumerProperties() {
145         final Properties kafkaProperties = new Properties();
146
147         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
148         kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
149         kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
150         kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
151         kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
152         kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
153         kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
154
155         return kafkaProperties;
156     }
157
158     /**
159      * Gets the bootstrap servers.
160      *
161      * @return the bootstrap servers
162      */
163     public String getBootstrapServers() {
164         return bootstrapServers;
165     }
166
167     /**
168      * Gets the acks.
169      *
170      * @return the acks
171      */
172     public String getAcks() {
173         return acks;
174     }
175
176     /**
177      * Gets the retries.
178      *
179      * @return the retries
180      */
181     public int getRetries() {
182         return retries;
183     }
184
185     /**
186      * Gets the batch size.
187      *
188      * @return the batch size
189      */
190     public int getBatchSize() {
191         return batchSize;
192     }
193
194     /**
195      * Gets the linger time.
196      *
197      * @return the linger time
198      */
199     public int getLingerTime() {
200         return lingerTime;
201     }
202
203     /**
204      * Gets the buffer memory.
205      *
206      * @return the buffer memory
207      */
208     public long getBufferMemory() {
209         return bufferMemory;
210     }
211
212     /**
213      * Gets the group id.
214      *
215      * @return the group id
216      */
217     public String getGroupId() {
218         return groupId;
219     }
220
221     /**
222      * Checks if is enable auto commit.
223      *
224      * @return true, if checks if is enable auto commit
225      */
226     public boolean isEnableAutoCommit() {
227         return enableAutoCommit;
228     }
229
230     /**
231      * Gets the auto commit time.
232      *
233      * @return the auto commit time
234      */
235     public int getAutoCommitTime() {
236         return autoCommitTime;
237     }
238
239     /**
240      * Gets the session timeout.
241      *
242      * @return the session timeout
243      */
244     public int getSessionTimeout() {
245         return sessionTimeout;
246     }
247
248     /**
249      * Gets the producer topic.
250      *
251      * @return the producer topic
252      */
253     public String getProducerTopic() {
254         return producerTopic;
255     }
256
257     /**
258      * Gets the consumer poll time.
259      *
260      * @return the consumer poll time
261      */
262     public long getConsumerPollTime() {
263         return consumerPollTime;
264     }
265
266     /**
267      * Gets the consumer poll duration.
268      * @return The poll duration
269      */
270     public Duration getConsumerPollDuration() {
271         return Duration.ofMillis(consumerPollTime);
272     }
273
274     /**
275      * Gets the consumer topic list.
276      *
277      * @return the consumer topic list
278      */
279     public Collection<String> getConsumerTopicList() {
280         return Arrays.asList(consumerTopicList);
281     }
282
283     /**
284      * Gets the key serializer.
285      *
286      * @return the key serializer
287      */
288     public String getKeySerializer() {
289         return keySerializer;
290     }
291
292     /**
293      * Gets the value serializer.
294      *
295      * @return the value serializer
296      */
297     public String getValueSerializer() {
298         return valueSerializer;
299     }
300
301     /**
302      * Gets the key deserializer.
303      *
304      * @return the key deserializer
305      */
306     public String getKeyDeserializer() {
307         return keyDeserializer;
308     }
309
310     /**
311      * Gets the value deserializer.
312      *
313      * @return the value deserializer
314      */
315     public String getValueDeserializer() {
316         return valueDeserializer;
317     }
318
319     /*
320      * (non-Javadoc)
321      *
322      * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
323      */
324     @Override
325     public GroupValidationResult validate() {
326         final GroupValidationResult result = super.validate();
327
328         if (isNullOrBlank(bootstrapServers)) {
329             result.setResult("bootstrapServers", ValidationStatus.INVALID,
330                             "not specified, must be specified as a string of form host:port");
331         }
332
333         if (isNullOrBlank(acks)) {
334             result.setResult("acks", ValidationStatus.INVALID,
335                             "not specified, must be specified as a string with values [0|1|all]");
336         }
337
338         if (retries < 0) {
339             result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
340                             "[" + retries + "] invalid, must be specified as retries >= 0");
341         }
342
343         if (batchSize < 0) {
344             result.setResult("batchSize", ValidationStatus.INVALID,
345                             "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
346         }
347
348         if (lingerTime < 0) {
349             result.setResult("lingerTime", ValidationStatus.INVALID,
350                             "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
351         }
352
353         if (bufferMemory < 0) {
354             result.setResult("bufferMemory", ValidationStatus.INVALID,
355                             "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
356         }
357
358         if (isNullOrBlank(groupId)) {
359             result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
360         }
361
362         if (autoCommitTime < 0) {
363             result.setResult("autoCommitTime", ValidationStatus.INVALID,
364                             "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
365         }
366
367         if (sessionTimeout < 0) {
368             result.setResult("sessionTimeout", ValidationStatus.INVALID,
369                             "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
370         }
371
372         if (isNullOrBlank(producerTopic)) {
373             result.setResult("producerTopic", ValidationStatus.INVALID,
374                             SPECIFY_AS_STRING_MESSAGE);
375         }
376
377         if (consumerPollTime < 0) {
378             result.setResult("consumerPollTime", ValidationStatus.INVALID,
379                             "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
380         }
381
382         validateConsumerTopicList(result);
383
384         if (isNullOrBlank(keySerializer)) {
385             result.setResult("keySerializer", ValidationStatus.INVALID,
386                             SPECIFY_AS_STRING_MESSAGE);
387         }
388
389         if (isNullOrBlank(valueSerializer)) {
390             result.setResult("valueSerializer", ValidationStatus.INVALID,
391                             SPECIFY_AS_STRING_MESSAGE);
392         }
393
394         if (isNullOrBlank(keyDeserializer)) {
395             result.setResult("keyDeserializer", ValidationStatus.INVALID,
396                             SPECIFY_AS_STRING_MESSAGE);
397         }
398
399         if (isNullOrBlank(valueDeserializer)) {
400             result.setResult("valueDeserializer", ValidationStatus.INVALID,
401                             SPECIFY_AS_STRING_MESSAGE);
402         }
403
404         return result;
405     }
406
407     private void validateConsumerTopicList(final GroupValidationResult result) {
408         if (consumerTopicList == null || consumerTopicList.length == 0) {
409             result.setResult("consumerTopicList", ValidationStatus.INVALID,
410                             "not specified, must be specified as a list of strings");
411         }
412
413         StringBuilder consumerTopicStringBuilder = new StringBuilder();
414         for (final String consumerTopic : consumerTopicList) {
415             if (consumerTopic == null || consumerTopic.trim().length() == 0) {
416                 consumerTopicStringBuilder.append(consumerTopic + "/");
417             }
418         }
419         if (consumerTopicStringBuilder.length() > 0) {
420             result.setResult("consumerTopicList", ValidationStatus.INVALID,
421                             "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
422         }
423     }
424
425     private boolean isNullOrBlank(final String stringValue) {
426         return stringValue == null || stringValue.trim().length() == 0;
427     }
428 }