2357b180756927e43ac9345a7bb182aecaf28804
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Properties;
26
27 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
28
29 /**
30  * Apex parameters for Kafka as an event carrier technology.
31  *
32  * @author Liam Fallon (liam.fallon@ericsson.com)
33  */
34 public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters {
35     // @formatter:off
36     /** The label of this carrier technology. */
37     public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
38
39     /** The producer plugin class for the Kafka carrier technology. */
40     public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
41
42     /** The consumer plugin class for the Kafka carrier technology. */
43     public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
44
45     // Default parameter values
46     private static final String   DEFAULT_ACKS                = "all";
47     private static final String   DEFAULT_BOOTSTRAP_SERVERS   = "localhost:9092";
48     private static final int      DEFAULT_RETRIES             = 0;
49     private static final int      DEFAULT_BATCH_SIZE          = 16384;
50     private static final int      DEFAULT_LINGER_TIME         = 1;
51     private static final long     DEFAULT_BUFFER_MEMORY       = 33554432;
52     private static final String   DEFAULT_GROUP_ID            = "default-group-id";
53     private static final boolean  DEFAULT_ENABLE_AUTO_COMMIT  = true;
54     private static final int      DEFAULT_AUTO_COMMIT_TIME    = 1000;
55     private static final int      DEFAULT_SESSION_TIMEOUT     = 30000;
56     private static final String   DEFAULT_PRODUCER_TOPIC      = "apex-out";
57     private static final int      DEFAULT_CONSUMER_POLL_TIME  = 100;
58     private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"};
59     private static final String   DEFAULT_KEY_SERIALIZER      = "org.apache.kafka.common.serialization.StringSerializer";
60     private static final String   DEFAULT_VALUE_SERIALIZER    = "org.apache.kafka.common.serialization.StringSerializer";
61     private static final String   DEFAULT_KEY_DESERIALIZER    = "org.apache.kafka.common.serialization.StringDeserializer";
62     private static final String   DEFAULT_VALUE_DESERIALIZER  = "org.apache.kafka.common.serialization.StringDeserializer";
63
64     // Parameter property map tokens
65     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
66     private static final String PROPERTY_ACKS               = "acks";
67     private static final String PROPERTY_RETRIES            = "retries";
68     private static final String PROPERTY_BATCH_SIZE         = "batch.size";
69     private static final String PROPERTY_LINGER_TIME        = "linger.ms";
70     private static final String PROPERTY_BUFFER_MEMORY      = "buffer.memory";
71     private static final String PROPERTY_GROUP_ID           = "group.id";
72     private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
73     private static final String PROPERTY_AUTO_COMMIT_TIME   = "auto.commit.interval.ms";
74     private static final String PROPERTY_SESSION_TIMEOUT    = "session.timeout.ms";
75     private static final String PROPERTY_KEY_SERIALIZER     = "key.serializer";
76     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
77     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
78     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
79
80     // kafka carrier parameters
81     private String   bootstrapServers  = DEFAULT_BOOTSTRAP_SERVERS;
82     private String   acks              = DEFAULT_ACKS;
83     private int      retries           = DEFAULT_RETRIES;
84     private int      batchSize         = DEFAULT_BATCH_SIZE;
85     private int      lingerTime        = DEFAULT_LINGER_TIME;
86     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
87     private String   groupId           = DEFAULT_GROUP_ID;
88     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTO_COMMIT;
89     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
90     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
91     private String   producerTopic     = DEFAULT_PRODUCER_TOPIC;
92     private int      consumerPollTime  = DEFAULT_CONSUMER_POLL_TIME;
93     private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST;
94     private String   keySerializer     = DEFAULT_KEY_SERIALIZER;
95     private String   valueSerializer   = DEFAULT_VALUE_SERIALIZER;
96     private String   keyDeserializer   = DEFAULT_KEY_DESERIALIZER;
97     private String   valueDeserializer = DEFAULT_VALUE_DESERIALIZER;
98     // @formatter:on
99
100     /**
101      * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
102      * service.
103      */
104     public KAFKACarrierTechnologyParameters() {
105         super(KAFKACarrierTechnologyParameters.class.getCanonicalName());
106
107         // Set the carrier technology properties for the kafka carrier technology
108         this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
109         this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
110         this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
111     }
112
113     /**
114      * Gets the kafka producer properties.
115      *
116      * @return the kafka producer properties
117      */
118     public Properties getKafkaProducerProperties() {
119         final Properties kafkaProperties = new Properties();
120
121         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
122         kafkaProperties.put(PROPERTY_ACKS, acks);
123         kafkaProperties.put(PROPERTY_RETRIES, retries);
124         kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
125         kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
126         kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
127         kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
128         kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
129
130         return kafkaProperties;
131     }
132
133     /**
134      * Gets the kafka consumer properties.
135      *
136      * @return the kafka consumer properties
137      */
138     public Properties getKafkaConsumerProperties() {
139         final Properties kafkaProperties = new Properties();
140
141         kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
142         kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
143         kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
144         kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
145         kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
146         kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
147         kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
148
149         return kafkaProperties;
150     }
151
152     /**
153      * Gets the bootstrap servers.
154      *
155      * @return the bootstrap servers
156      */
157     public String getBootstrapServers() {
158         return bootstrapServers;
159     }
160
161     /**
162      * Gets the acks.
163      *
164      * @return the acks
165      */
166     public String getAcks() {
167         return acks;
168     }
169
170     /**
171      * Gets the retries.
172      *
173      * @return the retries
174      */
175     public int getRetries() {
176         return retries;
177     }
178
179     /**
180      * Gets the batch size.
181      *
182      * @return the batch size
183      */
184     public int getBatchSize() {
185         return batchSize;
186     }
187
188     /**
189      * Gets the linger time.
190      *
191      * @return the linger time
192      */
193     public int getLingerTime() {
194         return lingerTime;
195     }
196
197     /**
198      * Gets the buffer memory.
199      *
200      * @return the buffer memory
201      */
202     public long getBufferMemory() {
203         return bufferMemory;
204     }
205
206     /**
207      * Gets the group id.
208      *
209      * @return the group id
210      */
211     public String getGroupId() {
212         return groupId;
213     }
214
215     /**
216      * Checks if is enable auto commit.
217      *
218      * @return true, if checks if is enable auto commit
219      */
220     public boolean isEnableAutoCommit() {
221         return enableAutoCommit;
222     }
223
224     /**
225      * Gets the auto commit time.
226      *
227      * @return the auto commit time
228      */
229     public int getAutoCommitTime() {
230         return autoCommitTime;
231     }
232
233     /**
234      * Gets the session timeout.
235      *
236      * @return the session timeout
237      */
238     public int getSessionTimeout() {
239         return sessionTimeout;
240     }
241
242     /**
243      * Gets the producer topic.
244      *
245      * @return the producer topic
246      */
247     public String getProducerTopic() {
248         return producerTopic;
249     }
250
251     /**
252      * Gets the consumer poll time.
253      *
254      * @return the consumer poll time
255      */
256     public long getConsumerPollTime() {
257         return consumerPollTime;
258     }
259
260     /**
261      * Gets the consumer topic list.
262      *
263      * @return the consumer topic list
264      */
265     public Collection<String> getConsumerTopicList() {
266         return Arrays.asList(consumerTopicList);
267     }
268
269     /**
270      * Gets the key serializer.
271      *
272      * @return the key serializer
273      */
274     public String getKeySerializer() {
275         return keySerializer;
276     }
277
278     /**
279      * Gets the value serializer.
280      *
281      * @return the value serializer
282      */
283     public String getValueSerializer() {
284         return valueSerializer;
285     }
286
287     /**
288      * Gets the key deserializer.
289      *
290      * @return the key deserializer
291      */
292     public String getKeyDeserializer() {
293         return keyDeserializer;
294     }
295
296     /**
297      * Gets the value deserializer.
298      *
299      * @return the value deserializer
300      */
301     public String getValueDeserializer() {
302         return valueDeserializer;
303     }
304
305     /*
306      * (non-Javadoc)
307      *
308      * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
309      */
310     @Override
311     public String validate() {
312         final StringBuilder errorMessageBuilder = new StringBuilder();
313
314         errorMessageBuilder.append(super.validate());
315
316         if (bootstrapServers == null || bootstrapServers.trim().length() == 0) {
317             errorMessageBuilder
318                     .append("  bootstrapServers not specified, must be specified as a string of form host:port\n");
319         }
320
321         if (acks == null || acks.trim().length() == 0) {
322             errorMessageBuilder.append("  acks not specified, must be specified as a string with values [0|1|all]\n");
323         }
324
325         if (retries < 0) {
326             errorMessageBuilder.append("  retries [" + retries + "] invalid, must be specified as retries >= 0\n");
327         }
328
329         if (batchSize < 0) {
330             errorMessageBuilder
331                     .append("  batchSize [" + batchSize + "] invalid, must be specified as batchSize >= 0\n");
332         }
333
334         if (lingerTime < 0) {
335             errorMessageBuilder
336                     .append("  lingerTime [" + lingerTime + "] invalid, must be specified as lingerTime >= 0\n");
337         }
338
339         if (bufferMemory < 0) {
340             errorMessageBuilder
341                     .append("  bufferMemory [" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0\n");
342         }
343
344         if (groupId == null || groupId.trim().length() == 0) {
345             errorMessageBuilder.append("  groupId not specified, must be specified as a string\n");
346         }
347
348         if (autoCommitTime < 0) {
349             errorMessageBuilder.append(
350                     "  autoCommitTime [" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0\n");
351         }
352
353         if (sessionTimeout < 0) {
354             errorMessageBuilder.append(
355                     "  sessionTimeout [" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0\n");
356         }
357
358         if (producerTopic == null || producerTopic.trim().length() == 0) {
359             errorMessageBuilder.append("  producerTopic not specified, must be specified as a string\n");
360         }
361
362         if (consumerPollTime < 0) {
363             errorMessageBuilder.append("  consumerPollTime [" + consumerPollTime
364                     + "] invalid, must be specified as consumerPollTime >= 0\n");
365         }
366
367         if (consumerTopicList == null || consumerTopicList.length == 0) {
368             errorMessageBuilder.append("  consumerTopicList not specified, must be specified as a list of strings\n");
369         }
370
371         for (final String consumerTopic : consumerTopicList) {
372             if (consumerTopic == null || consumerTopic.trim().length() == 0) {
373                 errorMessageBuilder.append("  invalid consumer topic \"" + consumerTopic
374                         + "\" specified on consumerTopicList, consumer topics must be specified as strings\n");
375             }
376         }
377
378         if (keySerializer == null || keySerializer.trim().length() == 0) {
379             errorMessageBuilder.append("  keySerializer not specified, must be specified as a string\n");
380         }
381
382         if (valueSerializer == null || valueSerializer.trim().length() == 0) {
383             errorMessageBuilder.append("  valueSerializer not specified, must be specified as a string\n");
384         }
385
386         if (keyDeserializer == null || keyDeserializer.trim().length() == 0) {
387             errorMessageBuilder.append("  keyDeserializer not specified, must be specified as a string\n");
388         }
389
390         if (valueDeserializer == null || valueDeserializer.trim().length() == 0) {
391             errorMessageBuilder.append("  valueDeserializer not specified, must be specified as a string\n");
392         }
393
394         return errorMessageBuilder.toString();
395     }
396 }