Add duplicate check, examples for kafka Properties 43/90943/5
authorliamfallon <liam.fallon@est.tech>
Fri, 5 Jul 2019 13:41:03 +0000 (13:41 +0000)
committerliamfallon <liam.fallon@est.tech>
Fri, 5 Jul 2019 13:41:03 +0000 (13:41 +0000)
Added checks for dealing with duplication of specification of properties
explicitly and in kafkaPropertes
Added examples for kafkaProperties
Added documentation for kafkaProperties

Issue-ID: POLICY-1818
Change-Id: Icbe01d6f1d25d4570dcc85cc3db28588743b9c41
Signed-off-by: liamfallon <liam.fallon@est.tech>
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
testsuites/integration/integration-common/src/main/resources/examples/config/SampleDomain/Kafka2KafkaJsonEvent.json

index 927d79e..9d9acf6 100644 (file)
@@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
      * @return the kafka producer properties
      */
     public Properties getKafkaProducerProperties() {
-        final Properties returnKafkaProperties = new Properties();
+        final Properties retKafkaProps = new Properties();
 
         // Add properties from the Kafka property array
         if (kafkaProperties != null) {
             for (int i = 0; i < kafkaProperties.length; i++) {
-                returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+                retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
             }
         }
 
-        returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
-        returnKafkaProperties.put(PROPERTY_ACKS, acks);
-        returnKafkaProperties.put(PROPERTY_RETRIES, retries);
-        returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
-        returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
-        returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
-        returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
-        returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
-        returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
-        return returnKafkaProperties;
+        // @formatter:off
+        putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+        putExplicitProperty(retKafkaProps, PROPERTY_ACKS,              acks,             DEFAULT_ACKS);
+        putExplicitProperty(retKafkaProps, PROPERTY_RETRIES,           retries,          DEFAULT_RETRIES);
+        putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE,        batchSize,        DEFAULT_BATCH_SIZE);
+        putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME,       lingerTime,       DEFAULT_LINGER_TIME);
+        putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY,     bufferMemory,     DEFAULT_BUFFER_MEMORY);
+        putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER,    keySerializer,    DEFAULT_STRING_SERZER);
+        putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER,  valueSerializer,  DEFAULT_STRING_SERZER);
+        putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
+        // @formatter:on
+
+        return retKafkaProps;
     }
 
     /**
@@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
      * @return the kafka consumer properties
      */
     public Properties getKafkaConsumerProperties() {
-        final Properties returnKafkaProperties = new Properties();
+        final Properties retKafkaProps = new Properties();
 
         // Add properties from the Kafka property array
         if (kafkaProperties != null) {
             for (int i = 0; i < kafkaProperties.length; i++) {
-                returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+                retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
             }
         }
 
-        returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
-        returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
-        returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
-        returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
-        returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
-        returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
-        returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
-        return returnKafkaProperties;
+        // @formatter:off
+        putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS,  bootstrapServers, DEFAULT_BOOT_SERVERS);
+        putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID,           groupId,          DEFAULT_GROUP_ID);
+        putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
+        putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME,   autoCommitTime,   DEFAULT_AUTO_COMMIT_TIME);
+        putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT,    sessionTimeout,   DEFAULT_SESSION_TIMEOUT);
+        putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER,   keyDeserializer,  DEFAULT_STRING_DESZER);
+        putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
+        // @formatter:on
+
+        return retKafkaProps;
     }
 
     /**
@@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
             }
         }
     }
+
+    /**
+     * Put a property into the properties if it is not already defined and is not the default value.
+     *
+     * @param returnKafkaProperties the properties to set the value in
+     * @param property the property to put
+     * @param value the value of the property to put
+     * @param defaultValue the default value of the property to put
+     */
+    private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
+            final Object value, final Object defaultValue) {
+
+        // Check if the property is already in the properties
+        if (!returnKafkaProperties.containsKey(property)) {
+            // Not found, so add it
+            returnKafkaProperties.setProperty(property, value.toString());
+        }
+        else {
+            // Found, only overwrite if the property does not have the default value
+            if (value == null) {
+                returnKafkaProperties.setProperty(property, defaultValue.toString());
+            }
+            else if (!value.toString().contentEquals(defaultValue.toString())) {
+                returnKafkaProperties.setProperty(property, value.toString());
+            }
+        }
+    }
 }
index 03e1139..94586e9 100644 (file)
@@ -1,6 +1,7 @@
 //
 // ============LICENSE_START=======================================================
 //  Copyright (C) 2016-2018 Ericsson. All rights reserved.
+//  Modifications Copyright (C) 2019 Nordix Foundation.
 // ================================================================================
 // This file is licensed under the CREATIVE COMMONS ATTRIBUTION 4.0 INTERNATIONAL LICENSE
 // Full license text at https://creativecommons.org/licenses/by/4.0/legalcode
@@ -45,7 +46,8 @@ The input is uni-directional, an engine will only receive events from the input
     "keyDeserializer"   :
         "org.apache.kafka.common.serialization.StringDeserializer", <9>
     "valueDeserializer" :
-        "org.apache.kafka.common.serialization.StringDeserializer" <10>
+        "org.apache.kafka.common.serialization.StringDeserializer", <10>
+    "kafkaProperties"   : [] <11>
   }
 }
 ----
@@ -60,6 +62,8 @@ The input is uni-directional, an engine will only receive events from the input
 <8> consumer topic list
 <9> key for the Kafka de-serializer
 <10> value for the Kafka de-serializer
+<11> an optional list of name value pairs of properties to be passed transparently to Kafka.
+This field need not be specified, can be set to null, or to an empty list as here.
 
 
 === Kafka Output
@@ -85,7 +89,11 @@ The output is uni-directional, an engine will send events to the output but not
     "keySerializer"     :
         "org.apache.kafka.common.serialization.StringSerializer", <9>
     "valueSerializer"   :
-        "org.apache.kafka.common.serialization.StringSerializer" <10>
+        "org.apache.kafka.common.serialization.StringSerializer", <10>
+    "kafkaProperties": [ <11>
+        ["message.max.bytes", 1000000],
+        ["compression.codec", "none"]
+    ]
   }
 }
 ----
@@ -100,3 +108,7 @@ The output is uni-directional, an engine will send events to the output but not
 <8> producer topic
 <9> key for the Kafka serializer
 <10> value for the Kafka serializer
+<11> an optional list of name value pairs of properties to be passed transparently to Kafka. If a property appears in
+the _kafkaProperties_ field and is also explicitly specified to a non-default value (such as _lingerTime_
+and _linger.ms_) the explictly specified value of the property is used rather than the value specified in the
+_kafkaProperties_ list.
index 2f5405b..5b6ec33 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Samsung. All rights reserved.
+ *  Modifications Copyright (C) 2019 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest {
         Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
         assertNotNull(kafkaProducerProperties);
         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
-        assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+        assertEquals("1", kafkaProducerProperties.get("linger.ms"));
         assertEquals(null, kafkaProducerProperties.get("group.id"));
         assertEquals(null, kafkaProducerProperties.get("Property0"));
         assertEquals(null, kafkaProducerProperties.get("Property1"));
@@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest {
         kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
         assertNotNull(kafkaProducerProperties);
         assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
-        assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+        assertEquals("1", kafkaProducerProperties.get("linger.ms"));
         assertEquals(null, kafkaProducerProperties.get("group.id"));
         assertEquals("Value0", kafkaProducerProperties.get("Property0"));
         assertEquals("Value1", kafkaProducerProperties.get("Property1"));
@@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest {
         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
 
-        String[] blankStringList = {null, ""};
+        String[] blankStringList = { null, "" };
         kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
         kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
@@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest {
         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
 
-        // @formatter:off
+        // @formatter:offkafkaCarrierTechnologyParameters
         String[][] kafkaProperties0 = {
             {
                 null, "Value0"
@@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest {
         assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
         kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
         assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+    }
+
+    @Test
+    public void testExplicitImplicit() {
+        KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
+        assertNotNull(kafkaCtp);
+
+        assertTrue(kafkaCtp.validate().isValid());
+
+        // @formatter:off
+        assertEquals("localhost:9092",   kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+        assertEquals("all",              kafkaCtp.getKafkaProducerProperties().get("acks"));
+        assertEquals("0",                kafkaCtp.getKafkaProducerProperties().get("retries"));
+        assertEquals("16384",            kafkaCtp.getKafkaProducerProperties().get("batch.size"));
+        assertEquals("1",                kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
+        assertEquals("33554432",         kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
+        assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
+        assertEquals("true",             kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
+        assertEquals("1000",             kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
+        assertEquals("30000",            kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
+        // @formatter:on
+
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+                kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+                kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
+        assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+                kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
+        assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+                kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
+        assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+                kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
+
+        // @formatter:off
+        String[][] kafkaProperties0 = {
+            {
+                "bootstrap.servers", "localhost:9092"
+            }
+        };
+        // @formatter:on
+
+        kafkaCtp.setBootstrapServers(null);
+        kafkaCtp.setKafkaProperties(kafkaProperties0);
+        assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+        // @formatter:off
+        String[][] kafkaProperties1 = {
+            {
+                "bootstrap.servers", "localhost:9999"
+            }
+        };
+        // @formatter:on
+
+        kafkaCtp = new KafkaCarrierTechnologyParameters();
+        kafkaCtp.setKafkaProperties(kafkaProperties1);
+        assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+        // @formatter:off
+        String[][] kafkaProperties2 = {
+            {
+                "bootstrap.servers", "localhost:8888"
+            }
+        };
+        // @formatter:on
+
+        kafkaCtp = new KafkaCarrierTechnologyParameters();
+        kafkaCtp.setBootstrapServers("localhost:9092");
+        kafkaCtp.setKafkaProperties(kafkaProperties2);
+        assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+        // @formatter:off
+        String[][] kafkaProperties3 = {
+            {
+                "bootstrap.servers", "localhost:5555"
+            }
+        };
+        // @formatter:on
 
+        kafkaCtp = new KafkaCarrierTechnologyParameters();
+        kafkaCtp.setBootstrapServers("localhost:7777");
+        kafkaCtp.setKafkaProperties(kafkaProperties3);
+        assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
     }
 }
index 6b13db7..5f1a9c7 100644 (file)
@@ -28,7 +28,8 @@
                     "bufferMemory": 33554432,
                     "producerTopic": "apex-out-json",
                     "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
-                    "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer"
+                    "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
+                    "kafkaProperties": []
                 }
             },
             "eventProtocolParameters": {
                         "apex-in-json"
                     ],
                     "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
-                    "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer"
+                    "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+                    "kafkaProperties": [
+                        ["message.max.bytes", 1000000],
+                        ["compression.codec", "none"]
+                    ]
                 }
             },
             "eventProtocolParameters": {