* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.backends.Consumer;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
-
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-
+
/**
* constructor initialization
- *
+ *
* @param settings
* @param metrics
* @param curator
*/
public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
- @Qualifier("curator") CuratorFramework curator,
- @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
final boolean isCacheEnabled = kSetting_EnableCache;
-
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
fCache.setfApiId(apiNodeId);
fCache.startCache(mode, curator);
if(kafkaLiveLockAvoider!=null){
- kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
- fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+ kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+ fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
}
}
}
/*
* getConsumerFor
- *
+ *
* @see
* com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
* java.lang.String, java.lang.String, int, java.lang.String) This method is
*/
@Override
public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
Kafka011Consumer kc;
// To synchronize based on the consumer group.
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
+
if (fCache != null) {
fCache.signalOwnership(topic, consumerGroupName, consumerId);
}
-
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
+ consumerId);
log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ consumerId);
-
+
} finally {
if (locked) {
try {
fCache.dropAllConsumers();
}
-
+
private KafkaConsumerCache fCache;
private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
private String fkafkaBrokers;
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
-
+
if (null != keyVal) {
-
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
}
/**
- * Name CreateConsumerconfig
+ * Name CreateConsumerconfig
* @param topic
* @param groupId
* @param consumerId
* @return Properties
- *
+ *
* This method is to create Properties required to create kafka connection
- * Group name is replaced with different format groupid--topic to address same
- * groupids for multiple topics. Same groupid with multiple topics
+ * Group name is replaced with different format groupid--topic to address same
+ * groupids for multiple topics. Same groupid with multiple topics
* may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
*/
private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
final Properties props = new Properties();
//fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
- //Fix for CPFMF-644 :
+ //Fix for CPFMF-644 :
final String fakeGroupName = groupId + "--" + topic;
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
+ props.putAll(Utils.addSaslProps());
}
props.put("client.id", consumerId);
/**
* putting values in hashmap like consumer timeout, zookeeper time out, etc
- *
+ *
* @param setting
*/
private static void populateKafkaInternalDefaultsMap() { }
/*
* The starterIncremnt value is just to emulate calling certain consumers,
* in this test app all the consumers are local
- *
+ *
*/
private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
@SuppressWarnings("rawtypes")
@Override
public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
// TODO Auto-generated method stub
return null;
}