5ce96662e7c5f4c8f02c361b1f75dc09a3cf96b7
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Properties;
26
27 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
28 import org.onap.policy.common.parameters.GroupValidationResult;
29 import org.onap.policy.common.parameters.ValidationStatus;
30
31 /**
32  * Apex parameters for Kafka as an event carrier technology.
33  *
34  * @author Liam Fallon (liam.fallon@ericsson.com)
35  */
36 public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters {
37     // @formatter:off
38     /** The label of this carrier technology. */
39     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
40
41     /** The producer plugin class for the Kafka carrier technology. */
42     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
43
44     /** The consumer plugin class for the Kafka carrier technology. */
45     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
46
47     // Repeated strings in messages
48     private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
49
50     // Default parameter values
51     private static final String   DEFAULT_ACKS                = "all";
52     private static final String   DEFAULT_BOOTSTRAP_SERVERS   = "localhost:9092";
53     private static final int      DEFAULT_RETRIES             = 0;
54     private static final int      DEFAULT_BATCH_SIZE          = 16384;
55     private static final int      DEFAULT_LINGER_TIME         = 1;
56     private static final long     DEFAULT_BUFFER_MEMORY       = 33554432;
57     private static final String   DEFAULT_GROUP_ID            = "default-group-id";
58     private static final boolean  DEFAULT_ENABLE_AUTO_COMMIT  = true;
59     private static final int      DEFAULT_AUTO_COMMIT_TIME    = 1000;
60     private static final int      DEFAULT_SESSION_TIMEOUT     = 30000;
61     private static final String   DEFAULT_PRODUCER_TOPIC      = "apex-out";
62     private static final int      DEFAULT_CONSUMER_POLL_TIME  = 100;
63     private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"};
64     private static final String   DEFAULT_KEY_SERIALIZER      = "org.apache.kafka.common.serialization.StringSerializer";
65     private static final String   DEFAULT_VALUE_SERIALIZER    = "org.apache.kafka.common.serialization.StringSerializer";
66     private static final String   DEFAULT_KEY_DESERIALIZER    = "org.apache.kafka.common.serialization.StringDeserializer";
67     private static final String   DEFAULT_VALUE_DESERIALIZER  = "org.apache.kafka.common.serialization.StringDeserializer";
68
69     // Parameter property map tokens
70     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
71     private static final String PROPERTY_ACKS               = "acks";
72     private static final String PROPERTY_RETRIES            = "retries";
73     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
74     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
75     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
76     private static final String PROPERTY_GROUP_ID           = "group.id";
77     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
78     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
79     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
80     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
81     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
82     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
83     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
84
85     // kafka carrier parameters
86     private String   bootstrapServers  = DEFAULT_BOOTSTRAP_SERVERS;
87     private String   acks              = DEFAULT_ACKS;
88     private int      retries           = DEFAULT_RETRIES;
89     private int      batchSize         = DEFAULT_BATCH_SIZE;
90     private int      lingerTime        = DEFAULT_LINGER_TIME;
91     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
92     private String   groupId           = DEFAULT_GROUP_ID;
93     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTO_COMMIT;
94     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
95     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
96     private String   producerTopic     = DEFAULT_PRODUCER_TOPIC;
97     private int      consumerPollTime  = DEFAULT_CONSUMER_POLL_TIME;
98     private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST;
99     private String   keySerializer     = DEFAULT_KEY_SERIALIZER;
100     private String   valueSerializer   = DEFAULT_VALUE_SERIALIZER;
101     private String   keyDeserializer   = DEFAULT_KEY_DESERIALIZER;
102     private String   valueDeserializer = DEFAULT_VALUE_DESERIALIZER;
103     // @formatter:on
104
105     /**
106      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
107      * service.
108      */
109     public KAFKACarrierTechnologyParameters() {
110         super();
111
112         // Set the carrier technology properties for the kafka carrier technology
113         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
114         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
115         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
116     }
117
118     /**
119      * Gets the kafka producer properties.
120      *
121      * @return the kafka producer properties
122      */
123     public Properties getKafkaProducerProperties() {
124         final Properties kafkaProperties = new Properties();
125
126         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
127         kafkaProperties.put(PROPERTY_ACKS, acks);
128         kafkaProperties.put(PROPERTY_RETRIES, retries);
129         kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
130         kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
131         kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
132         kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
133         kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
134
135         return kafkaProperties;
136     }
137
138     /**
139      * Gets the kafka consumer properties.
140      *
141      * @return the kafka consumer properties
142      */
143     public Properties getKafkaConsumerProperties() {
144         final Properties kafkaProperties = new Properties();
145
146         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
147         kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
148         kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
149         kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
150         kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
151         kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
152         kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
153
154         return kafkaProperties;
155     }
156
157     /**
158      * Gets the bootstrap servers.
159      *
160      * @return the bootstrap servers
161      */
162     public String getBootstrapServers() {
163         return bootstrapServers;
164     }
165
166     /**
167      * Gets the acks.
168      *
169      * @return the acks
170      */
171     public String getAcks() {
172         return acks;
173     }
174
175     /**
176      * Gets the retries.
177      *
178      * @return the retries
179      */
180     public int getRetries() {
181         return retries;
182     }
183
184     /**
185      * Gets the batch size.
186      *
187      * @return the batch size
188      */
189     public int getBatchSize() {
190         return batchSize;
191     }
192
193     /**
194      * Gets the linger time.
195      *
196      * @return the linger time
197      */
198     public int getLingerTime() {
199         return lingerTime;
200     }
201
202     /**
203      * Gets the buffer memory.
204      *
205      * @return the buffer memory
206      */
207     public long getBufferMemory() {
208         return bufferMemory;
209     }
210
211     /**
212      * Gets the group id.
213      *
214      * @return the group id
215      */
216     public String getGroupId() {
217         return groupId;
218     }
219
220     /**
221      * Checks if is enable auto commit.
222      *
223      * @return true, if checks if is enable auto commit
224      */
225     public boolean isEnableAutoCommit() {
226         return enableAutoCommit;
227     }
228
229     /**
230      * Gets the auto commit time.
231      *
232      * @return the auto commit time
233      */
234     public int getAutoCommitTime() {
235         return autoCommitTime;
236     }
237
238     /**
239      * Gets the session timeout.
240      *
241      * @return the session timeout
242      */
243     public int getSessionTimeout() {
244         return sessionTimeout;
245     }
246
247     /**
248      * Gets the producer topic.
249      *
250      * @return the producer topic
251      */
252     public String getProducerTopic() {
253         return producerTopic;
254     }
255
256     /**
257      * Gets the consumer poll time.
258      *
259      * @return the consumer poll time
260      */
261     public long getConsumerPollTime() {
262         return consumerPollTime;
263     }
264
265     /**
266      * Gets the consumer topic list.
267      *
268      * @return the consumer topic list
269      */
270     public Collection<String> getConsumerTopicList() {
271         return Arrays.asList(consumerTopicList);
272     }
273
274     /**
275      * Gets the key serializer.
276      *
277      * @return the key serializer
278      */
279     public String getKeySerializer() {
280         return keySerializer;
281     }
282
283     /**
284      * Gets the value serializer.
285      *
286      * @return the value serializer
287      */
288     public String getValueSerializer() {
289         return valueSerializer;
290     }
291
292     /**
293      * Gets the key deserializer.
294      *
295      * @return the key deserializer
296      */
297     public String getKeyDeserializer() {
298         return keyDeserializer;
299     }
300
301     /**
302      * Gets the value deserializer.
303      *
304      * @return the value deserializer
305      */
306     public String getValueDeserializer() {
307         return valueDeserializer;
308     }
309
310     /*
311      * (non-Javadoc)
312      *
313      * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
314      */
315     @Override
316     public GroupValidationResult validate() {
317         final GroupValidationResult result = super.validate();
318
319         if (isNullOrBlank(bootstrapServers)) {
320             result.setResult("bootstrapServers", ValidationStatus.INVALID,
321                             "not specified, must be specified as a string of form host:port");
322         }
323
324         if (isNullOrBlank(acks)) {
325             result.setResult("acks", ValidationStatus.INVALID,
326                             "not specified, must be specified as a string with values [0|1|all]");
327         }
328
329         if (retries < 0) {
330             result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
331                             "[" + retries + "] invalid, must be specified as retries >= 0");
332         }
333
334         if (batchSize < 0) {
335             result.setResult("batchSize", ValidationStatus.INVALID,
336                             "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
337         }
338
339         if (lingerTime < 0) {
340             result.setResult("lingerTime", ValidationStatus.INVALID,
341                             "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
342         }
343
344         if (bufferMemory < 0) {
345             result.setResult("bufferMemory", ValidationStatus.INVALID,
346                             "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
347         }
348
349         if (isNullOrBlank(groupId)) {
350             result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
351         }
352
353         if (autoCommitTime < 0) {
354             result.setResult("autoCommitTime", ValidationStatus.INVALID,
355                             "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
356         }
357
358         if (sessionTimeout < 0) {
359             result.setResult("sessionTimeout", ValidationStatus.INVALID,
360                             "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
361         }
362
363         if (isNullOrBlank(producerTopic)) {
364             result.setResult("producerTopic", ValidationStatus.INVALID,
365                             SPECIFY_AS_STRING_MESSAGE);
366         }
367
368         if (consumerPollTime < 0) {
369             result.setResult("consumerPollTime", ValidationStatus.INVALID,
370                             "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
371         }
372
373         validateConsumerTopicList(result);
374
375         if (isNullOrBlank(keySerializer)) {
376             result.setResult("keySerializer", ValidationStatus.INVALID,
377                             SPECIFY_AS_STRING_MESSAGE);
378         }
379
380         if (isNullOrBlank(valueSerializer)) {
381             result.setResult("valueSerializer", ValidationStatus.INVALID,
382                             SPECIFY_AS_STRING_MESSAGE);
383         }
384
385         if (isNullOrBlank(keyDeserializer)) {
386             result.setResult("keyDeserializer", ValidationStatus.INVALID,
387                             SPECIFY_AS_STRING_MESSAGE);
388         }
389
390         if (isNullOrBlank(valueDeserializer)) {
391             result.setResult("valueDeserializer", ValidationStatus.INVALID,
392                             SPECIFY_AS_STRING_MESSAGE);
393         }
394
395         return result;
396     }
397     
398     private void validateConsumerTopicList(final GroupValidationResult result) {
399         if (consumerTopicList == null || consumerTopicList.length == 0) {
400             result.setResult("consumerTopicList", ValidationStatus.INVALID,
401                             "not specified, must be specified as a list of strings");
402         }
403
404         StringBuilder consumerTopicStringBuilder = new StringBuilder();
405         for (final String consumerTopic : consumerTopicList) {
406             if (consumerTopic == null || consumerTopic.trim().length() == 0) {
407                 consumerTopicStringBuilder.append(consumerTopic + "/");
408             }
409         }
410         if (consumerTopicStringBuilder.length() > 0) {
411             result.setResult("consumerTopicList", ValidationStatus.INVALID,
412                             "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
413         }
414     }
415
416     private boolean isNullOrBlank(final String stringValue) {
417        return stringValue == null || stringValue.trim().length() == 0;
418     }
419 }