2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Properties;
27 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
30 * Apex parameters for Kafka as an event carrier technology.
32 * @author Liam Fallon (liam.fallon@ericsson.com)
34 public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters {
36 /** The label of this carrier technology. */
37 public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
39 /** The producer plugin class for the Kafka carrier technology. */
40 public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
42 /** The consumer plugin class for the Kafka carrier technology. */
43 public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
45 // Default parameter values
46 private static final String DEFAULT_ACKS = "all";
47 private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
48 private static final int DEFAULT_RETRIES = 0;
49 private static final int DEFAULT_BATCH_SIZE = 16384;
50 private static final int DEFAULT_LINGER_TIME = 1;
51 private static final long DEFAULT_BUFFER_MEMORY = 33554432;
52 private static final String DEFAULT_GROUP_ID = "default-group-id";
53 private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true;
54 private static final int DEFAULT_AUTO_COMMIT_TIME = 1000;
55 private static final int DEFAULT_SESSION_TIMEOUT = 30000;
56 private static final String DEFAULT_PRODUCER_TOPIC = "apex-out";
57 private static final int DEFAULT_CONSUMER_POLL_TIME = 100;
58 private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"};
59 private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
60 private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
61 private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
62 private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
64 // Parameter property map tokens
65 private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
66 private static final String PROPERTY_ACKS = "acks";
67 private static final String PROPERTY_RETRIES = "retries";
68 private static final String PROPERTY_BATCH_SIZE = "batch.size";
69 private static final String PROPERTY_LINGER_TIME = "linger.ms";
70 private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory";
71 private static final String PROPERTY_GROUP_ID = "group.id";
72 private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
73 private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms";
74 private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms";
75 private static final String PROPERTY_KEY_SERIALIZER = "key.serializer";
76 private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer";
77 private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer";
78 private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
80 // kafka carrier parameters
81 private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
82 private String acks = DEFAULT_ACKS;
83 private int retries = DEFAULT_RETRIES;
84 private int batchSize = DEFAULT_BATCH_SIZE;
85 private int lingerTime = DEFAULT_LINGER_TIME;
86 private long bufferMemory = DEFAULT_BUFFER_MEMORY;
87 private String groupId = DEFAULT_GROUP_ID;
88 private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT;
89 private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
90 private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
91 private String producerTopic = DEFAULT_PRODUCER_TOPIC;
92 private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME;
93 private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST;
94 private String keySerializer = DEFAULT_KEY_SERIALIZER;
95 private String valueSerializer = DEFAULT_VALUE_SERIALIZER;
96 private String keyDeserializer = DEFAULT_KEY_DESERIALIZER;
97 private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER;
101 * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
104 public KAFKACarrierTechnologyParameters() {
105 super(KAFKACarrierTechnologyParameters.class.getCanonicalName());
107 // Set the carrier technology properties for the kafka carrier technology
108 this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
109 this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
110 this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
114 * Gets the kafka producer properties.
116 * @return the kafka producer properties
118 public Properties getKafkaProducerProperties() {
119 final Properties kafkaProperties = new Properties();
121 kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
122 kafkaProperties.put(PROPERTY_ACKS, acks);
123 kafkaProperties.put(PROPERTY_RETRIES, retries);
124 kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
125 kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
126 kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
127 kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
128 kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
130 return kafkaProperties;
134 * Gets the kafka consumer properties.
136 * @return the kafka consumer properties
138 public Properties getKafkaConsumerProperties() {
139 final Properties kafkaProperties = new Properties();
141 kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
142 kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
143 kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
144 kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
145 kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
146 kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
147 kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
149 return kafkaProperties;
153 * Gets the bootstrap servers.
155 * @return the bootstrap servers
157 public String getBootstrapServers() {
158 return bootstrapServers;
166 public String getAcks() {
173 * @return the retries
175 public int getRetries() {
180 * Gets the batch size.
182 * @return the batch size
184 public int getBatchSize() {
189 * Gets the linger time.
191 * @return the linger time
193 public int getLingerTime() {
198 * Gets the buffer memory.
200 * @return the buffer memory
202 public long getBufferMemory() {
209 * @return the group id
211 public String getGroupId() {
216 * Checks if is enable auto commit.
218 * @return true, if checks if is enable auto commit
220 public boolean isEnableAutoCommit() {
221 return enableAutoCommit;
225 * Gets the auto commit time.
227 * @return the auto commit time
229 public int getAutoCommitTime() {
230 return autoCommitTime;
234 * Gets the session timeout.
236 * @return the session timeout
238 public int getSessionTimeout() {
239 return sessionTimeout;
243 * Gets the producer topic.
245 * @return the producer topic
247 public String getProducerTopic() {
248 return producerTopic;
252 * Gets the consumer poll time.
254 * @return the consumer poll time
256 public long getConsumerPollTime() {
257 return consumerPollTime;
261 * Gets the consumer topic list.
263 * @return the consumer topic list
265 public Collection<String> getConsumerTopicList() {
266 return Arrays.asList(consumerTopicList);
270 * Gets the key serializer.
272 * @return the key serializer
274 public String getKeySerializer() {
275 return keySerializer;
279 * Gets the value serializer.
281 * @return the value serializer
283 public String getValueSerializer() {
284 return valueSerializer;
288 * Gets the key deserializer.
290 * @return the key deserializer
292 public String getKeyDeserializer() {
293 return keyDeserializer;
297 * Gets the value deserializer.
299 * @return the value deserializer
301 public String getValueDeserializer() {
302 return valueDeserializer;
308 * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
311 public String validate() {
312 final StringBuilder errorMessageBuilder = new StringBuilder();
314 errorMessageBuilder.append(super.validate());
316 if (bootstrapServers == null || bootstrapServers.trim().length() == 0) {
318 .append(" bootstrapServers not specified, must be specified as a string of form host:port\n");
321 if (acks == null || acks.trim().length() == 0) {
322 errorMessageBuilder.append(" acks not specified, must be specified as a string with values [0|1|all]\n");
326 errorMessageBuilder.append(" retries [" + retries + "] invalid, must be specified as retries >= 0\n");
331 .append(" batchSize [" + batchSize + "] invalid, must be specified as batchSize >= 0\n");
334 if (lingerTime < 0) {
336 .append(" lingerTime [" + lingerTime + "] invalid, must be specified as lingerTime >= 0\n");
339 if (bufferMemory < 0) {
341 .append(" bufferMemory [" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0\n");
344 if (groupId == null || groupId.trim().length() == 0) {
345 errorMessageBuilder.append(" groupId not specified, must be specified as a string\n");
348 if (autoCommitTime < 0) {
349 errorMessageBuilder.append(
350 " autoCommitTime [" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0\n");
353 if (sessionTimeout < 0) {
354 errorMessageBuilder.append(
355 " sessionTimeout [" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0\n");
358 if (producerTopic == null || producerTopic.trim().length() == 0) {
359 errorMessageBuilder.append(" producerTopic not specified, must be specified as a string\n");
362 if (consumerPollTime < 0) {
363 errorMessageBuilder.append(" consumerPollTime [" + consumerPollTime
364 + "] invalid, must be specified as consumerPollTime >= 0\n");
367 if (consumerTopicList == null || consumerTopicList.length == 0) {
368 errorMessageBuilder.append(" consumerTopicList not specified, must be specified as a list of strings\n");
371 for (final String consumerTopic : consumerTopicList) {
372 if (consumerTopic == null || consumerTopic.trim().length() == 0) {
373 errorMessageBuilder.append(" invalid consumer topic \"" + consumerTopic
374 + "\" specified on consumerTopicList, consumer topics must be specified as strings\n");
378 if (keySerializer == null || keySerializer.trim().length() == 0) {
379 errorMessageBuilder.append(" keySerializer not specified, must be specified as a string\n");
382 if (valueSerializer == null || valueSerializer.trim().length() == 0) {
383 errorMessageBuilder.append(" valueSerializer not specified, must be specified as a string\n");
386 if (keyDeserializer == null || keyDeserializer.trim().length() == 0) {
387 errorMessageBuilder.append(" keyDeserializer not specified, must be specified as a string\n");
390 if (valueDeserializer == null || valueDeserializer.trim().length() == 0) {
391 errorMessageBuilder.append(" valueDeserializer not specified, must be specified as a string\n");
394 return errorMessageBuilder.toString();