import java.util.Collection;
import java.util.Properties;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"};
private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName();
// Parameter property map tokens
private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer";
private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer";
private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
+ private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class";
// kafka carrier parameters
private String bootstrapServers = DEFAULT_BOOT_SERVERS;
private String valueSerializer = DEFAULT_STRING_SERZER;
private String keyDeserializer = DEFAULT_STRING_DESZER;
private String valueDeserializer = DEFAULT_STRING_DESZER;
+ private String partitionerClass = DEFAULT_PARTITIONR_CLASS;
// @formatter:on
/**
kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+ kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
return kafkaProperties;
}
return valueDeserializer;
}
+ /**
+ * Gets the value deserializer.
+ *
+ * @return the value deserializer
+ */
+ public String getPartitionerClass() {
+ return partitionerClass;
+ }
+
/*
* (non-Javadoc)
*
/**
* Validate that string parameters are correct.
- *
+ *
* @param result the result of the validation
*/
private void validateStringParameters(final GroupValidationResult result) {
result.setResult("producerTopic", ValidationStatus.INVALID,
SPECIFY_AS_STRING_MESSAGE);
}
+
+ if (isNullOrBlank(partitionerClass)) {
+ result.setResult("partitionerClass", ValidationStatus.INVALID,
+ SPECIFY_AS_STRING_MESSAGE);
+ }
}
/**
* Check if numeric parameters are valid.
- *
+ *
* @param result the result of the validation
*/
private void validateNumericParameters(final GroupValidationResult result) {
/**
* Validate the serializers and deserializers.
- *
+ *
* @param result the result of the validation.
*/
private void validateSerializersAndDeserializers(final GroupValidationResult result) {