* ============LICENSE_END==========================================================================
*/
-
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
- private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
+ private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "2000";
public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
- private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
+ private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "1000";
public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
import org.slf4j.Logger;
private boolean ready = false;
private int fetchPause = 5000; // Default pause between fetch - 5 seconds
protected final GeneralConfig generalConfig;
+ Admin kafkaAdminClient = null;
protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
*/
@Override
public void run() {
-
if (ready) {
running = true;
while (running) {
try {
boolean noData = true;
List<String> consumerResponse = null;
- consumerResponse = consumer.poll();
- for (String msg : consumerResponse) {
- noData = false;
- LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
- if (isMessageValid(msg)) {
- processMsg(msg);
+ if (isTopicExists(consumer.getTopicName())) {
+ consumerResponse = consumer.poll();
+ for (String msg : consumerResponse) {
+ noData = false;
+ LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
+ if (isMessageValid(msg)) {
+ processMsg(msg);
+ }
}
}
-
if (noData) {
pauseThread();
}
*/
@Override
public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
+ kafkaAdminClient = Admin.create(props);
try {
this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
}
}
+ private boolean isTopicExists(String topicName) {
+ LOG.trace("Checking for existence of topic - {}", topicName);
+ try {
+ for (String kafkaTopic : kafkaAdminClient.listTopics().names().get()) {
+ if (kafkaTopic.equals(topicName))
+ return true;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception in isTopicExists method - ", e);
+ }
+ return false;
+ }
+
@Override
public boolean isReady() {
return ready;