Allow Kafka plugin to take arbitrary properties 55/90455/2 2.2.0
authorliamfallon <liam.fallon@est.tech>
Tue, 25 Jun 2019 16:10:25 +0000 (16:10 +0000)
committerliamfallon <liam.fallon@est.tech>
Tue, 25 Jun 2019 16:10:25 +0000 (16:10 +0000)
This change adds support for arbitrary Kafka properties to be passed to
Kafka through the Kafka plugin.

Issue-ID: POLICY-1818
Change-Id: I4389876286747b250c8abe492e9e31674a9483c9
Signed-off-by: liamfallon <liam.fallon@est.tech>
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java

index affd10c..6860bca 100644 (file)
@@ -92,7 +92,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
 
         // Kick off the Kafka consumer
         kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
-        kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+        kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
                     + kafkaConsumerProperties.getConsumerTopicList());
@@ -154,7 +154,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
     public void run() {
         // Kick off the Kafka consumer
         kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
-        kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+        kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
                     + kafkaConsumerProperties.getConsumerTopicList());
@@ -181,7 +181,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
 
     /**
      * Trace a record if trace is enabled.
-     * 
+     *
      * @param record the record to trace
      */
     private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
index 36947ee..f66dbfe 100644 (file)
@@ -25,16 +25,24 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
 import org.onap.policy.common.parameters.GroupValidationResult;
 import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
 
 /**
  * Apex parameters for Kafka as an event carrier technology.
  *
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
+@Getter
+@Setter
 public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
     // @formatter:off
     /** The label of this carrier technology. */
@@ -48,6 +56,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
 
     // Repeated strings in messages
     private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
+    private static final String ENTRY = "entry ";
+    private static final String KAFKA_PROPERTIES = "kafkaProperties";
 
     // Default parameter values
     private static final String   DEFAULT_ACKS             = "all";
@@ -85,24 +95,44 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
     private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
 
     // kafka carrier parameters
+    @NotBlank
     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
+    @NotBlank
     private String   acks              = DEFAULT_ACKS;
+    @Min(value = 0)
     private int      retries           = DEFAULT_RETRIES;
+    @Min(value = 0)
     private int      batchSize         = DEFAULT_BATCH_SIZE;
+    @Min(value = 0)
     private int      lingerTime        = DEFAULT_LINGER_TIME;
+    @Min(value = 0)
     private long     bufferMemory      = DEFAULT_BUFFER_MEMORY;
+    @NotBlank
     private String   groupId           = DEFAULT_GROUP_ID;
     private boolean  enableAutoCommit  = DEFAULT_ENABLE_AUTOCMIT;
+    @Min(value = 0)
     private int      autoCommitTime    = DEFAULT_AUTO_COMMIT_TIME;
+    @Min(value = 0)
     private int      sessionTimeout    = DEFAULT_SESSION_TIMEOUT;
+    @NotBlank
     private String   producerTopic     = DEFAULT_PROD_TOPIC;
+    @Min(value = 0)
     private int      consumerPollTime  = DEFAULT_CONS_POLL_TIME;
     private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
+    @NotBlank
     private String   keySerializer     = DEFAULT_STRING_SERZER;
+    @NotBlank
     private String   valueSerializer   = DEFAULT_STRING_SERZER;
+    @NotBlank
     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
+    @NotBlank
     private String   valueDeserializer = DEFAULT_STRING_DESZER;
+    @NotBlank
     private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
+
+    // All Kafka properties can be specified as an array of key-value pairs
+    private String[][] kafkaProperties = null;
+
     // @formatter:on
 
     /**
@@ -124,19 +154,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
      * @return the kafka producer properties
      */
     public Properties getKafkaProducerProperties() {
-        final Properties kafkaProperties = new Properties();
-
-        kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
-        kafkaProperties.put(PROPERTY_ACKS, acks);
-        kafkaProperties.put(PROPERTY_RETRIES, retries);
-        kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
-        kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
-        kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
-        kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
-        kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
-        kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
-        return kafkaProperties;
+        final Properties returnKafkaProperties = new Properties();
+
+        // Add properties from the Kafka property array
+        if (kafkaProperties != null) {
+            for (int i = 0; i < kafkaProperties.length; i++) {
+                returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+            }
+        }
+
+        returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+        returnKafkaProperties.put(PROPERTY_ACKS, acks);
+        returnKafkaProperties.put(PROPERTY_RETRIES, retries);
+        returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
+        returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
+        returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
+        returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
+        returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+        returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
+
+        return returnKafkaProperties;
     }
 
     /**
@@ -145,125 +182,33 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
      * @return the kafka consumer properties
      */
     public Properties getKafkaConsumerProperties() {
-        final Properties kafkaProperties = new Properties();
-
-        kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
-        kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
-        kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
-        kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
-        kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
-        kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
-        kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
-        return kafkaProperties;
-    }
-
-    /**
-     * Gets the bootstrap servers.
-     *
-     * @return the bootstrap servers
-     */
-    public String getBootstrapServers() {
-        return bootstrapServers;
-    }
-
-    /**
-     * Gets the acks.
-     *
-     * @return the acks
-     */
-    public String getAcks() {
-        return acks;
-    }
+        final Properties returnKafkaProperties = new Properties();
 
-    /**
-     * Gets the retries.
-     *
-     * @return the retries
-     */
-    public int getRetries() {
-        return retries;
-    }
-
-    /**
-     * Gets the batch size.
-     *
-     * @return the batch size
-     */
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    /**
-     * Gets the linger time.
-     *
-     * @return the linger time
-     */
-    public int getLingerTime() {
-        return lingerTime;
-    }
-
-    /**
-     * Gets the buffer memory.
-     *
-     * @return the buffer memory
-     */
-    public long getBufferMemory() {
-        return bufferMemory;
-    }
-
-    /**
-     * Gets the group id.
-     *
-     * @return the group id
-     */
-    public String getGroupId() {
-        return groupId;
-    }
-
-    /**
-     * Checks if is enable auto commit.
-     *
-     * @return true, if checks if is enable auto commit
-     */
-    public boolean isEnableAutoCommit() {
-        return enableAutoCommit;
-    }
-
-    /**
-     * Gets the auto commit time.
-     *
-     * @return the auto commit time
-     */
-    public int getAutoCommitTime() {
-        return autoCommitTime;
-    }
+        // Add properties from the Kafka property array
+        if (kafkaProperties != null) {
+            for (int i = 0; i < kafkaProperties.length; i++) {
+                returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+            }
+        }
 
-    /**
-     * Gets the session timeout.
-     *
-     * @return the session timeout
-     */
-    public int getSessionTimeout() {
-        return sessionTimeout;
-    }
+        returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+        returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
+        returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
+        returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
+        returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
+        returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
+        returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
 
-    /**
-     * Gets the producer topic.
-     *
-     * @return the producer topic
-     */
-    public String getProducerTopic() {
-        return producerTopic;
+        return returnKafkaProperties;
     }
 
     /**
-     * Gets the consumer poll time.
+     * Gets the consumer topic list.
      *
-     * @return the consumer poll time
+     * @return the consumer topic list
      */
-    public long getConsumerPollTime() {
-        return consumerPollTime;
+    public Collection<String> getConsumerTopicListAsCollection() {
+        return Arrays.asList(consumerTopicList);
     }
 
     /**
@@ -274,60 +219,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
         return Duration.ofMillis(consumerPollTime);
     }
 
-    /**
-     * Gets the consumer topic list.
-     *
-     * @return the consumer topic list
-     */
-    public Collection<String> getConsumerTopicList() {
-        return Arrays.asList(consumerTopicList);
-    }
-
-    /**
-     * Gets the key serializer.
-     *
-     * @return the key serializer
-     */
-    public String getKeySerializer() {
-        return keySerializer;
-    }
-
-    /**
-     * Gets the value serializer.
-     *
-     * @return the value serializer
-     */
-    public String getValueSerializer() {
-        return valueSerializer;
-    }
-
-    /**
-     * Gets the key deserializer.
-     *
-     * @return the key deserializer
-     */
-    public String getKeyDeserializer() {
-        return keyDeserializer;
-    }
-
-    /**
-     * Gets the value deserializer.
-     *
-     * @return the value deserializer
-     */
-    public String getValueDeserializer() {
-        return valueDeserializer;
-    }
-
-    /**
-     * Gets the value deserializer.
-     *
-     * @return the value deserializer
-     */
-    public String getPartitionerClass() {
-        return partitionerClass;
-    }
-
     /*
      * (non-Javadoc)
      *
@@ -337,136 +228,63 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
     public GroupValidationResult validate() {
         final GroupValidationResult result = super.validate();
 
-        validateStringParameters(result);
-
-        validateNumericParameters(result);
-
         validateConsumerTopicList(result);
 
-        validateSerializersAndDeserializers(result);
+        validateKafkaProperties(result);
 
         return result;
     }
 
     /**
-     * Validate that string parameters are correct.
-     *
-     * @param result the result of the validation
-     */
-    private void validateStringParameters(final GroupValidationResult result) {
-        if (isNullOrBlank(bootstrapServers)) {
-            result.setResult("bootstrapServers", ValidationStatus.INVALID,
-                            "not specified, must be specified as a string of form host:port");
-        }
-
-        if (isNullOrBlank(acks)) {
-            result.setResult("acks", ValidationStatus.INVALID,
-                            "not specified, must be specified as a string with values [0|1|all]");
-        }
-
-        if (isNullOrBlank(groupId)) {
-            result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
-        }
-
-        if (isNullOrBlank(producerTopic)) {
-            result.setResult("producerTopic", ValidationStatus.INVALID,
-                            SPECIFY_AS_STRING_MESSAGE);
-        }
-
-        if (isNullOrBlank(partitionerClass)) {
-            result.setResult("partitionerClass", ValidationStatus.INVALID,
-                            SPECIFY_AS_STRING_MESSAGE);
-        }
-    }
-
-    /**
-     * Check if numeric parameters are valid.
-     *
-     * @param result the result of the validation
-     */
-    private void validateNumericParameters(final GroupValidationResult result) {
-        if (retries < 0) {
-            result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
-                            "[" + retries + "] invalid, must be specified as retries >= 0");
-        }
-
-        if (batchSize < 0) {
-            result.setResult("batchSize", ValidationStatus.INVALID,
-                            "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
-        }
-
-        if (lingerTime < 0) {
-            result.setResult("lingerTime", ValidationStatus.INVALID,
-                            "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
-        }
-
-        if (bufferMemory < 0) {
-            result.setResult("bufferMemory", ValidationStatus.INVALID,
-                            "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
-        }
-
-        if (autoCommitTime < 0) {
-            result.setResult("autoCommitTime", ValidationStatus.INVALID,
-                            "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
-        }
-
-        if (sessionTimeout < 0) {
-            result.setResult("sessionTimeout", ValidationStatus.INVALID,
-                            "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
-        }
-
-        if (consumerPollTime < 0) {
-            result.setResult("consumerPollTime", ValidationStatus.INVALID,
-                            "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
-        }
-    }
-
-    /**
-     * Validate the serializers and deserializers.
+     * Validate the consumer topic list.
      *
      * @param result the result of the validation.
      */
-    private void validateSerializersAndDeserializers(final GroupValidationResult result) {
-        if (isNullOrBlank(keySerializer)) {
-            result.setResult("keySerializer", ValidationStatus.INVALID,
-                            SPECIFY_AS_STRING_MESSAGE);
-        }
-
-        if (isNullOrBlank(valueSerializer)) {
-            result.setResult("valueSerializer", ValidationStatus.INVALID,
-                            SPECIFY_AS_STRING_MESSAGE);
-        }
-
-        if (isNullOrBlank(keyDeserializer)) {
-            result.setResult("keyDeserializer", ValidationStatus.INVALID,
-                            SPECIFY_AS_STRING_MESSAGE);
-        }
-
-        if (isNullOrBlank(valueDeserializer)) {
-            result.setResult("valueDeserializer", ValidationStatus.INVALID,
-                            SPECIFY_AS_STRING_MESSAGE);
-        }
-    }
-
     private void validateConsumerTopicList(final GroupValidationResult result) {
         if (consumerTopicList == null || consumerTopicList.length == 0) {
             result.setResult("consumerTopicList", ValidationStatus.INVALID,
-                            "not specified, must be specified as a list of strings");
+                    "not specified, must be specified as a list of strings");
+            return;
         }
 
         StringBuilder consumerTopicStringBuilder = new StringBuilder();
         for (final String consumerTopic : consumerTopicList) {
-            if (consumerTopic == null || consumerTopic.trim().length() == 0) {
+            if (StringUtils.isBlank(consumerTopic)) {
                 consumerTopicStringBuilder.append(consumerTopic + "/");
             }
         }
         if (consumerTopicStringBuilder.length() > 0) {
             result.setResult("consumerTopicList", ValidationStatus.INVALID,
-                            "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
+                    "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
         }
     }
 
-    private boolean isNullOrBlank(final String stringValue) {
-        return stringValue == null || stringValue.trim().length() == 0;
+    /**
+     * Validate the kafka properties.
+     *
+     * @param result the result of the validation.
+     */
+    private void validateKafkaProperties(final GroupValidationResult result) {
+        // Kafka properties are optional
+        if (kafkaProperties == null || kafkaProperties.length == 0) {
+            return;
+        }
+
+        for (int i = 0; i < kafkaProperties.length; i++) {
+            if (kafkaProperties[i].length != 2) {
+                result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+                        ENTRY + i + " invalid, kafka properties must be name-value pairs");
+            }
+
+            if (StringUtils.isBlank(kafkaProperties[i][0])) {
+                result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+                        ENTRY + i + " invalid, key is null or blank");
+            }
+
+            if (StringUtils.isBlank(kafkaProperties[i][1])) {
+                result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+                        ENTRY + i + " invalid, value is null or blank");
+            }
+        }
     }
 }
index 6eca1dc..2f5405b 100644 (file)
 
 package org.onap.policy.apex.plugins.event.carrier.kafka;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Properties;
-import org.junit.Before;
+
 import org.junit.Test;
-import org.onap.policy.common.parameters.GroupValidationResult;
 
 public class KafkaCarrierTechnologyParametersTest {
-
-    KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = null;
-    Properties kafkaProducerProperties = null;
-    Properties kafkaConsumerProperties = null;
-    GroupValidationResult result = null;
-
-    /**
-     * Set up testing.
-     *
-     * @throws Exception on setup errors
-     */
-    @Before
-    public void setUp() throws Exception {
-        kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
-        kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
-        kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
-    }
-
     @Test
     public void testKafkaCarrierTechnologyParameters() {
+        KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
         assertNotNull(kafkaCarrierTechnologyParameters);
+
+        assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
     }
 
     @Test
     public void testGetKafkaProducerProperties() {
+        KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+        Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
+        assertNotNull(kafkaProducerProperties);
+        assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+        assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+        assertEquals(null, kafkaProducerProperties.get("group.id"));
+        assertEquals(null, kafkaProducerProperties.get("Property0"));
+        assertEquals(null, kafkaProducerProperties.get("Property1"));
+        assertEquals(null, kafkaProducerProperties.get("Property2"));
+
+        // @formatter:off
+        String[][] kafkaProperties = {
+            {
+                "Property0", "Value0"
+            },
+            {
+                "Property1", "Value1"
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+        kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
         assertNotNull(kafkaProducerProperties);
+        assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+        assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+        assertEquals(null, kafkaProducerProperties.get("group.id"));
+        assertEquals("Value0", kafkaProducerProperties.get("Property0"));
+        assertEquals("Value1", kafkaProducerProperties.get("Property1"));
+        assertEquals(null, kafkaProducerProperties.get("Property2"));
     }
 
     @Test
     public void testGetKafkaConsumerProperties() {
+        KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+        Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
         assertNotNull(kafkaConsumerProperties);
+        assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+        assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+        assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+        assertEquals(null, kafkaConsumerProperties.get("Property0"));
+        assertEquals(null, kafkaConsumerProperties.get("Property1"));
+        assertEquals(null, kafkaConsumerProperties.get("Property2"));
+
+        // @formatter:off
+        String[][] kafkaProperties = {
+            {
+                "Property0", "Value0"
+            },
+            {
+                "Property1", "Value1"
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+        kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
+        assertNotNull(kafkaConsumerProperties);
+        assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+        assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+        assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+        assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
+        assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
+        assertEquals(null, kafkaConsumerProperties.get("Property2"));
     }
 
     @Test
     public void testValidate() {
-        result = kafkaCarrierTechnologyParameters.validate();
-        assertNotNull(result);
+        KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+        assertNotNull(kafkaCarrierTechnologyParameters);
+
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
+        kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getAcks();
+        kafkaCarrierTechnologyParameters.setAcks(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setAcks(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
+        kafkaCarrierTechnologyParameters.setGroupId(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
+        kafkaCarrierTechnologyParameters.setProducerTopic(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
+        kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
+        kafkaCarrierTechnologyParameters.setRetries(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setRetries(origIntValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
+        kafkaCarrierTechnologyParameters.setBatchSize(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
+        kafkaCarrierTechnologyParameters.setLingerTime(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
+        kafkaCarrierTechnologyParameters.setBufferMemory(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
+        kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
+        kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
+        kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
+        kafkaCarrierTechnologyParameters.setKeySerializer(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
+        kafkaCarrierTechnologyParameters.setValueSerializer(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
+        kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
+        kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
+        kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        String[] blankStringList = {null, ""};
+        kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
+        kafkaCarrierTechnologyParameters.setKafkaProperties(null);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        // @formatter:off
+        String[][] kafkaProperties0 = {
+            {
+                null, "Value0"
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        // @formatter:off
+        String[][] kafkaProperties1 = {
+            {
+                "Property1", null
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        // @formatter:off
+        String[][] kafkaProperties2 = {
+            {
+                "Property1", null
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+        // @formatter:off
+        String[][] kafkaProperties3 = {
+            {
+                "Property1", "Value0", "Value1"
+            }
+        };
+        // @formatter:on
+
+        kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
+        assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+        kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+        assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
     }
 }