2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Properties;
27 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
28 import org.onap.policy.common.parameters.GroupValidationResult;
29 import org.onap.policy.common.parameters.ValidationStatus;
32 * Apex parameters for Kafka as an event carrier technology.
34 * @author Liam Fallon (liam.fallon@ericsson.com)
36 public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters {
38 /** The label of this carrier technology. */
39 public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
41 /** The producer plugin class for the Kafka carrier technology. */
42 public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
44 /** The consumer plugin class for the Kafka carrier technology. */
45 public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
47 // Repeated strings in messages
48 private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
50 // Default parameter values
51 private static final String DEFAULT_ACKS = "all";
52 private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
53 private static final int DEFAULT_RETRIES = 0;
54 private static final int DEFAULT_BATCH_SIZE = 16384;
55 private static final int DEFAULT_LINGER_TIME = 1;
56 private static final long DEFAULT_BUFFER_MEMORY = 33554432;
57 private static final String DEFAULT_GROUP_ID = "default-group-id";
58 private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true;
59 private static final int DEFAULT_AUTO_COMMIT_TIME = 1000;
60 private static final int DEFAULT_SESSION_TIMEOUT = 30000;
61 private static final String DEFAULT_PRODUCER_TOPIC = "apex-out";
62 private static final int DEFAULT_CONSUMER_POLL_TIME = 100;
63 private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"};
64 private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
65 private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
66 private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
67 private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
69 // Parameter property map tokens
70 private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
71 private static final String PROPERTY_ACKS = "acks";
72 private static final String PROPERTY_RETRIES = "retries";
73 private static final String PROPERTY_BATCH_SIZE = "batch.size";
74 private static final String PROPERTY_LINGER_TIME = "linger.ms";
75 private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory";
76 private static final String PROPERTY_GROUP_ID = "group.id";
77 private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
78 private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms";
79 private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms";
80 private static final String PROPERTY_KEY_SERIALIZER = "key.serializer";
81 private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer";
82 private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer";
83 private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
85 // kafka carrier parameters
86 private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
87 private String acks = DEFAULT_ACKS;
88 private int retries = DEFAULT_RETRIES;
89 private int batchSize = DEFAULT_BATCH_SIZE;
90 private int lingerTime = DEFAULT_LINGER_TIME;
91 private long bufferMemory = DEFAULT_BUFFER_MEMORY;
92 private String groupId = DEFAULT_GROUP_ID;
93 private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT;
94 private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
95 private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
96 private String producerTopic = DEFAULT_PRODUCER_TOPIC;
97 private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME;
98 private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST;
99 private String keySerializer = DEFAULT_KEY_SERIALIZER;
100 private String valueSerializer = DEFAULT_VALUE_SERIALIZER;
101 private String keyDeserializer = DEFAULT_KEY_DESERIALIZER;
102 private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER;
106 * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
109 public KAFKACarrierTechnologyParameters() {
112 // Set the carrier technology properties for the kafka carrier technology
113 this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
114 this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
115 this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
119 * Gets the kafka producer properties.
121 * @return the kafka producer properties
123 public Properties getKafkaProducerProperties() {
124 final Properties kafkaProperties = new Properties();
126 kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
127 kafkaProperties.put(PROPERTY_ACKS, acks);
128 kafkaProperties.put(PROPERTY_RETRIES, retries);
129 kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
130 kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
131 kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
132 kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
133 kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
135 return kafkaProperties;
139 * Gets the kafka consumer properties.
141 * @return the kafka consumer properties
143 public Properties getKafkaConsumerProperties() {
144 final Properties kafkaProperties = new Properties();
146 kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
147 kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
148 kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
149 kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
150 kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
151 kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
152 kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
154 return kafkaProperties;
158 * Gets the bootstrap servers.
160 * @return the bootstrap servers
162 public String getBootstrapServers() {
163 return bootstrapServers;
171 public String getAcks() {
178 * @return the retries
180 public int getRetries() {
185 * Gets the batch size.
187 * @return the batch size
189 public int getBatchSize() {
194 * Gets the linger time.
196 * @return the linger time
198 public int getLingerTime() {
203 * Gets the buffer memory.
205 * @return the buffer memory
207 public long getBufferMemory() {
214 * @return the group id
216 public String getGroupId() {
221 * Checks if is enable auto commit.
223 * @return true, if checks if is enable auto commit
225 public boolean isEnableAutoCommit() {
226 return enableAutoCommit;
230 * Gets the auto commit time.
232 * @return the auto commit time
234 public int getAutoCommitTime() {
235 return autoCommitTime;
239 * Gets the session timeout.
241 * @return the session timeout
243 public int getSessionTimeout() {
244 return sessionTimeout;
248 * Gets the producer topic.
250 * @return the producer topic
252 public String getProducerTopic() {
253 return producerTopic;
257 * Gets the consumer poll time.
259 * @return the consumer poll time
261 public long getConsumerPollTime() {
262 return consumerPollTime;
266 * Gets the consumer topic list.
268 * @return the consumer topic list
270 public Collection<String> getConsumerTopicList() {
271 return Arrays.asList(consumerTopicList);
275 * Gets the key serializer.
277 * @return the key serializer
279 public String getKeySerializer() {
280 return keySerializer;
284 * Gets the value serializer.
286 * @return the value serializer
288 public String getValueSerializer() {
289 return valueSerializer;
293 * Gets the key deserializer.
295 * @return the key deserializer
297 public String getKeyDeserializer() {
298 return keyDeserializer;
302 * Gets the value deserializer.
304 * @return the value deserializer
306 public String getValueDeserializer() {
307 return valueDeserializer;
313 * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
316 public GroupValidationResult validate() {
317 final GroupValidationResult result = super.validate();
319 if (isNullOrBlank(bootstrapServers)) {
320 result.setResult("bootstrapServers", ValidationStatus.INVALID,
321 "not specified, must be specified as a string of form host:port");
324 if (isNullOrBlank(acks)) {
325 result.setResult("acks", ValidationStatus.INVALID,
326 "not specified, must be specified as a string with values [0|1|all]");
330 result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
331 "[" + retries + "] invalid, must be specified as retries >= 0");
335 result.setResult("batchSize", ValidationStatus.INVALID,
336 "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
339 if (lingerTime < 0) {
340 result.setResult("lingerTime", ValidationStatus.INVALID,
341 "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
344 if (bufferMemory < 0) {
345 result.setResult("bufferMemory", ValidationStatus.INVALID,
346 "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
349 if (isNullOrBlank(groupId)) {
350 result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
353 if (autoCommitTime < 0) {
354 result.setResult("autoCommitTime", ValidationStatus.INVALID,
355 "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
358 if (sessionTimeout < 0) {
359 result.setResult("sessionTimeout", ValidationStatus.INVALID,
360 "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
363 if (isNullOrBlank(producerTopic)) {
364 result.setResult("producerTopic", ValidationStatus.INVALID,
365 SPECIFY_AS_STRING_MESSAGE);
368 if (consumerPollTime < 0) {
369 result.setResult("consumerPollTime", ValidationStatus.INVALID,
370 "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
373 validateConsumerTopicList(result);
375 if (isNullOrBlank(keySerializer)) {
376 result.setResult("keySerializer", ValidationStatus.INVALID,
377 SPECIFY_AS_STRING_MESSAGE);
380 if (isNullOrBlank(valueSerializer)) {
381 result.setResult("valueSerializer", ValidationStatus.INVALID,
382 SPECIFY_AS_STRING_MESSAGE);
385 if (isNullOrBlank(keyDeserializer)) {
386 result.setResult("keyDeserializer", ValidationStatus.INVALID,
387 SPECIFY_AS_STRING_MESSAGE);
390 if (isNullOrBlank(valueDeserializer)) {
391 result.setResult("valueDeserializer", ValidationStatus.INVALID,
392 SPECIFY_AS_STRING_MESSAGE);
398 private void validateConsumerTopicList(final GroupValidationResult result) {
399 if (consumerTopicList == null || consumerTopicList.length == 0) {
400 result.setResult("consumerTopicList", ValidationStatus.INVALID,
401 "not specified, must be specified as a list of strings");
404 StringBuilder consumerTopicStringBuilder = new StringBuilder();
405 for (final String consumerTopic : consumerTopicList) {
406 if (consumerTopic == null || consumerTopic.trim().length() == 0) {
407 consumerTopicStringBuilder.append(consumerTopic + "/");
410 if (consumerTopicStringBuilder.length() > 0) {
411 result.setResult("consumerTopicList", ValidationStatus.INVALID,
412 "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
416 private boolean isNullOrBlank(final String stringValue) {
417 return stringValue == null || stringValue.trim().length() == 0;