Prohibit kafka client library from generating WARN message burst 59/136459/3
authorRavi Pendurty <ravi.pendurty@highstreet-technologies.com>
Mon, 13 Nov 2023 12:09:31 +0000 (17:39 +0530)
committerHerbert Eiselt <herbert.eiselt@highstreet-technologies.com>
Wed, 6 Dec 2023 17:30:59 +0000 (17:30 +0000)
Query for topic existence before polling

Issue-ID: CCSDK-3962
Change-Id: I579f9133b01caf5ba9268d9e25d455778d5efcc4
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java

index 3b33944..92367e6 100644 (file)
@@ -17,7 +17,6 @@
  * ============LICENSE_END==========================================================================
  */
 
-
 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
 
 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
@@ -36,10 +35,10 @@ public abstract class MessageConfig implements Configuration {
     private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
 
     public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
-    private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
+    private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "2000";
 
     public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
-    private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
+    private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "1000";
 
     public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
     private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
index 249eb61..7687027 100644 (file)
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
 import org.slf4j.Logger;
@@ -42,6 +45,7 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
     private boolean ready = false;
     private int fetchPause = 5000; // Default pause between fetch - 5 seconds
     protected final GeneralConfig generalConfig;
+    Admin kafkaAdminClient = null;
 
     protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
         this.generalConfig = generalConfig;
@@ -54,22 +58,22 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
      */
     @Override
     public void run() {
-
         if (ready) {
             running = true;
             while (running) {
                 try {
                     boolean noData = true;
                     List<String> consumerResponse = null;
-                    consumerResponse = consumer.poll();
-                    for (String msg : consumerResponse) {
-                        noData = false;
-                        LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
-                        if (isMessageValid(msg)) {
-                            processMsg(msg);
+                    if (isTopicExists(consumer.getTopicName())) {
+                        consumerResponse = consumer.poll();
+                        for (String msg : consumerResponse) {
+                            noData = false;
+                            LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
+                            if (isMessageValid(msg)) {
+                                processMsg(msg);
+                            }
                         }
                     }
-
                     if (noData) {
                         pauseThread();
                     }
@@ -103,6 +107,9 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
      */
     @Override
     public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
+        kafkaAdminClient = Admin.create(props);
 
         try {
             this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
@@ -122,6 +129,19 @@ public abstract class StrimziKafkaVESMsgConsumerImpl
         }
     }
 
+    private boolean isTopicExists(String topicName) {
+        LOG.trace("Checking for existence of topic - {}", topicName);
+        try {
+            for (String kafkaTopic : kafkaAdminClient.listTopics().names().get()) {
+                if (kafkaTopic.equals(topicName))
+                    return true;
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception in isTopicExists method - ", e);
+        }
+        return false;
+    }
+
     @Override
     public boolean isReady() {
         return ready;
index 352db03..80e232a 100644 (file)
@@ -68,7 +68,7 @@ public class VESMsgKafkaConsumer {
      */
     public List<String> poll() {
         List<String> msgs = new ArrayList<>();
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout));
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeout));
         for (ConsumerRecord<String, String> rec : records) {
             msgs.add(rec.value());
         }