Fix Kafka partiion support in APEX 71/84871/1
authorliamfallon <liam.fallon@est.tech>
Wed, 10 Apr 2019 12:52:37 +0000 (12:52 +0000)
committerliamfallon <liam.fallon@est.tech>
Wed, 10 Apr 2019 12:52:37 +0000 (12:52 +0000)
The kafka producer property ‘partitioner.class’ should be configurable via
the EngineConfig.json file so that a specialized or custom Kafka partitioner
can be used.

Issue-ID: POLICY-1627
Change-Id: Ic36ccdf3d244ab932b58c3e2ae1cd668249726c5
Signed-off-by: liamfallon <liam.fallon@est.tech>
plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java

index 8517416..6aa9d53 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
 import org.onap.policy.common.parameters.GroupValidationResult;
 import org.onap.policy.common.parameters.ValidationStatus;
@@ -64,6 +65,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
     private static final String[] DEFAULT_CONS_TOPICLIST   = {"apex-in"};
     private static final String   DEFAULT_STRING_SERZER    = "org.apache.kafka.common.serialization.StringSerializer";
     private static final String   DEFAULT_STRING_DESZER    = "org.apache.kafka.common.serialization.StringDeserializer";
+    private static final String   DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName();
 
     // Parameter property map tokens
     private static final String PROPERTY_BOOTSTRAP_SERVERS  = "bootstrap.servers";
@@ -80,6 +82,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
     private static final String PROPERTY_VALUE_SERIALIZER   = "value.serializer";
     private static final String PROPERTY_KEY_DESERIALIZER   = "key.deserializer";
     private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
+    private static final String PROPERTY_PARTITIONER_CLASS  = "partitioner.class";
 
     // kafka carrier parameters
     private String   bootstrapServers  = DEFAULT_BOOT_SERVERS;
@@ -99,6 +102,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
     private String   valueSerializer   = DEFAULT_STRING_SERZER;
     private String   keyDeserializer   = DEFAULT_STRING_DESZER;
     private String   valueDeserializer = DEFAULT_STRING_DESZER;
+    private String   partitionerClass  = DEFAULT_PARTITIONR_CLASS;
     // @formatter:on
 
     /**
@@ -130,6 +134,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
         kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
         kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
         kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+        kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
 
         return kafkaProperties;
     }
@@ -314,6 +319,15 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
         return valueDeserializer;
     }
 
+    /**
+     * Gets the value deserializer.
+     *
+     * @return the value deserializer
+     */
+    public String getPartitionerClass() {
+        return partitionerClass;
+    }
+
     /*
      * (non-Javadoc)
      *
@@ -336,7 +350,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
 
     /**
      * Validate that string parameters are correct.
-     * 
+     *
      * @param result the result of the validation
      */
     private void validateStringParameters(final GroupValidationResult result) {
@@ -358,11 +372,16 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
             result.setResult("producerTopic", ValidationStatus.INVALID,
                             SPECIFY_AS_STRING_MESSAGE);
         }
+
+        if (isNullOrBlank(partitionerClass)) {
+            result.setResult("partitionerClass", ValidationStatus.INVALID,
+                            SPECIFY_AS_STRING_MESSAGE);
+        }
     }
 
     /**
      * Check if numeric parameters are valid.
-     * 
+     *
      * @param result the result of the validation
      */
     private void validateNumericParameters(final GroupValidationResult result) {
@@ -404,7 +423,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
 
     /**
      * Validate the serializers and deserializers.
-     * 
+     *
      * @param result the result of the validation.
      */
     private void validateSerializersAndDeserializers(final GroupValidationResult result) {