2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
23 import java.time.Duration;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Properties;
28 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
29 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
30 import org.onap.policy.common.parameters.GroupValidationResult;
31 import org.onap.policy.common.parameters.ValidationStatus;
34 * Apex parameters for Kafka as an event carrier technology.
36 * @author Liam Fallon (liam.fallon@ericsson.com)
38 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
40 /** The label of this carrier technology. */
41 public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
43 /** The producer plugin class for the Kafka carrier technology. */
44 public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
46 /** The consumer plugin class for the Kafka carrier technology. */
47 public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
49 // Repeated strings in messages
50 private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
52 // Default parameter values
53 private static final String DEFAULT_ACKS = "all";
54 private static final String DEFAULT_BOOT_SERVERS = "localhost:9092";
55 private static final int DEFAULT_RETRIES = 0;
56 private static final int DEFAULT_BATCH_SIZE = 16384;
57 private static final int DEFAULT_LINGER_TIME = 1;
58 private static final long DEFAULT_BUFFER_MEMORY = 33554432;
59 private static final String DEFAULT_GROUP_ID = "default-group-id";
60 private static final boolean DEFAULT_ENABLE_AUTOCMIT = true;
61 private static final int DEFAULT_AUTO_COMMIT_TIME = 1000;
62 private static final int DEFAULT_SESSION_TIMEOUT = 30000;
63 private static final String DEFAULT_PROD_TOPIC = "apex-out";
64 private static final int DEFAULT_CONS_POLL_TIME = 100;
65 private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"};
66 private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
67 private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
68 private static final String DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName();
70 // Parameter property map tokens
71 private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
72 private static final String PROPERTY_ACKS = "acks";
73 private static final String PROPERTY_RETRIES = "retries";
74 private static final String PROPERTY_BATCH_SIZE = "batch.size";
75 private static final String PROPERTY_LINGER_TIME = "linger.ms";
76 private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory";
77 private static final String PROPERTY_GROUP_ID = "group.id";
78 private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
79 private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms";
80 private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms";
81 private static final String PROPERTY_KEY_SERIALIZER = "key.serializer";
82 private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer";
83 private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer";
84 private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
85 private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class";
87 // kafka carrier parameters
88 private String bootstrapServers = DEFAULT_BOOT_SERVERS;
89 private String acks = DEFAULT_ACKS;
90 private int retries = DEFAULT_RETRIES;
91 private int batchSize = DEFAULT_BATCH_SIZE;
92 private int lingerTime = DEFAULT_LINGER_TIME;
93 private long bufferMemory = DEFAULT_BUFFER_MEMORY;
94 private String groupId = DEFAULT_GROUP_ID;
95 private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT;
96 private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
97 private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
98 private String producerTopic = DEFAULT_PROD_TOPIC;
99 private int consumerPollTime = DEFAULT_CONS_POLL_TIME;
100 private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
101 private String keySerializer = DEFAULT_STRING_SERZER;
102 private String valueSerializer = DEFAULT_STRING_SERZER;
103 private String keyDeserializer = DEFAULT_STRING_DESZER;
104 private String valueDeserializer = DEFAULT_STRING_DESZER;
105 private String partitionerClass = DEFAULT_PARTITIONR_CLASS;
109 * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
112 public KafkaCarrierTechnologyParameters() {
115 // Set the carrier technology properties for the kafka carrier technology
116 this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
117 this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
118 this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
122 * Gets the kafka producer properties.
124 * @return the kafka producer properties
126 public Properties getKafkaProducerProperties() {
127 final Properties kafkaProperties = new Properties();
129 kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
130 kafkaProperties.put(PROPERTY_ACKS, acks);
131 kafkaProperties.put(PROPERTY_RETRIES, retries);
132 kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
133 kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
134 kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
135 kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
136 kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
137 kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
139 return kafkaProperties;
143 * Gets the kafka consumer properties.
145 * @return the kafka consumer properties
147 public Properties getKafkaConsumerProperties() {
148 final Properties kafkaProperties = new Properties();
150 kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
151 kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
152 kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
153 kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
154 kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
155 kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
156 kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
158 return kafkaProperties;
162 * Gets the bootstrap servers.
164 * @return the bootstrap servers
166 public String getBootstrapServers() {
167 return bootstrapServers;
175 public String getAcks() {
182 * @return the retries
184 public int getRetries() {
189 * Gets the batch size.
191 * @return the batch size
193 public int getBatchSize() {
198 * Gets the linger time.
200 * @return the linger time
202 public int getLingerTime() {
207 * Gets the buffer memory.
209 * @return the buffer memory
211 public long getBufferMemory() {
218 * @return the group id
220 public String getGroupId() {
225 * Checks if is enable auto commit.
227 * @return true, if checks if is enable auto commit
229 public boolean isEnableAutoCommit() {
230 return enableAutoCommit;
234 * Gets the auto commit time.
236 * @return the auto commit time
238 public int getAutoCommitTime() {
239 return autoCommitTime;
243 * Gets the session timeout.
245 * @return the session timeout
247 public int getSessionTimeout() {
248 return sessionTimeout;
252 * Gets the producer topic.
254 * @return the producer topic
256 public String getProducerTopic() {
257 return producerTopic;
261 * Gets the consumer poll time.
263 * @return the consumer poll time
265 public long getConsumerPollTime() {
266 return consumerPollTime;
270 * Gets the consumer poll duration.
271 * @return The poll duration
273 public Duration getConsumerPollDuration() {
274 return Duration.ofMillis(consumerPollTime);
278 * Gets the consumer topic list.
280 * @return the consumer topic list
282 public Collection<String> getConsumerTopicList() {
283 return Arrays.asList(consumerTopicList);
287 * Gets the key serializer.
289 * @return the key serializer
291 public String getKeySerializer() {
292 return keySerializer;
296 * Gets the value serializer.
298 * @return the value serializer
300 public String getValueSerializer() {
301 return valueSerializer;
305 * Gets the key deserializer.
307 * @return the key deserializer
309 public String getKeyDeserializer() {
310 return keyDeserializer;
314 * Gets the value deserializer.
316 * @return the value deserializer
318 public String getValueDeserializer() {
319 return valueDeserializer;
323 * Gets the value deserializer.
325 * @return the value deserializer
327 public String getPartitionerClass() {
328 return partitionerClass;
334 * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
337 public GroupValidationResult validate() {
338 final GroupValidationResult result = super.validate();
340 validateStringParameters(result);
342 validateNumericParameters(result);
344 validateConsumerTopicList(result);
346 validateSerializersAndDeserializers(result);
352 * Validate that string parameters are correct.
354 * @param result the result of the validation
356 private void validateStringParameters(final GroupValidationResult result) {
357 if (isNullOrBlank(bootstrapServers)) {
358 result.setResult("bootstrapServers", ValidationStatus.INVALID,
359 "not specified, must be specified as a string of form host:port");
362 if (isNullOrBlank(acks)) {
363 result.setResult("acks", ValidationStatus.INVALID,
364 "not specified, must be specified as a string with values [0|1|all]");
367 if (isNullOrBlank(groupId)) {
368 result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
371 if (isNullOrBlank(producerTopic)) {
372 result.setResult("producerTopic", ValidationStatus.INVALID,
373 SPECIFY_AS_STRING_MESSAGE);
376 if (isNullOrBlank(partitionerClass)) {
377 result.setResult("partitionerClass", ValidationStatus.INVALID,
378 SPECIFY_AS_STRING_MESSAGE);
383 * Check if numeric parameters are valid.
385 * @param result the result of the validation
387 private void validateNumericParameters(final GroupValidationResult result) {
389 result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
390 "[" + retries + "] invalid, must be specified as retries >= 0");
394 result.setResult("batchSize", ValidationStatus.INVALID,
395 "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
398 if (lingerTime < 0) {
399 result.setResult("lingerTime", ValidationStatus.INVALID,
400 "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
403 if (bufferMemory < 0) {
404 result.setResult("bufferMemory", ValidationStatus.INVALID,
405 "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
408 if (autoCommitTime < 0) {
409 result.setResult("autoCommitTime", ValidationStatus.INVALID,
410 "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
413 if (sessionTimeout < 0) {
414 result.setResult("sessionTimeout", ValidationStatus.INVALID,
415 "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
418 if (consumerPollTime < 0) {
419 result.setResult("consumerPollTime", ValidationStatus.INVALID,
420 "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
425 * Validate the serializers and deserializers.
427 * @param result the result of the validation.
429 private void validateSerializersAndDeserializers(final GroupValidationResult result) {
430 if (isNullOrBlank(keySerializer)) {
431 result.setResult("keySerializer", ValidationStatus.INVALID,
432 SPECIFY_AS_STRING_MESSAGE);
435 if (isNullOrBlank(valueSerializer)) {
436 result.setResult("valueSerializer", ValidationStatus.INVALID,
437 SPECIFY_AS_STRING_MESSAGE);
440 if (isNullOrBlank(keyDeserializer)) {
441 result.setResult("keyDeserializer", ValidationStatus.INVALID,
442 SPECIFY_AS_STRING_MESSAGE);
445 if (isNullOrBlank(valueDeserializer)) {
446 result.setResult("valueDeserializer", ValidationStatus.INVALID,
447 SPECIFY_AS_STRING_MESSAGE);
451 private void validateConsumerTopicList(final GroupValidationResult result) {
452 if (consumerTopicList == null || consumerTopicList.length == 0) {
453 result.setResult("consumerTopicList", ValidationStatus.INVALID,
454 "not specified, must be specified as a list of strings");
457 StringBuilder consumerTopicStringBuilder = new StringBuilder();
458 for (final String consumerTopic : consumerTopicList) {
459 if (consumerTopic == null || consumerTopic.trim().length() == 0) {
460 consumerTopicStringBuilder.append(consumerTopic + "/");
463 if (consumerTopicStringBuilder.length() > 0) {
464 result.setResult("consumerTopicList", ValidationStatus.INVALID,
465 "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
469 private boolean isNullOrBlank(final String stringValue) {
470 return stringValue == null || stringValue.trim().length() == 0;