Fix for Kafka Consumer is not safe error 32/74332/2 1.1.14
authorsunil unnava <sunil.unnava@att.com>
Thu, 6 Dec 2018 11:22:32 +0000 (06:22 -0500)
committersunil unnava <su622b@att.com>
Thu, 6 Dec 2018 11:45:36 +0000 (11:45 +0000)
Issue-ID: DMAAP-896
Change-Id: I085dbad1248790796e220267cb3e603ecc6c1067
Signed-off-by: sunil unnava <sunil.unnava@att.com>
pom.xml
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
version.properties

diff --git a/pom.xml b/pom.xml
index affe8f7..c213101 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
        <artifactId>msgrtr</artifactId>
-       <version>1.1.13-SNAPSHOT</version>
+       <version>1.1.14-SNAPSHOT</version>
        <packaging>jar</packaging>
        <name>dmaap-messagerouter-msgrtr</name>
        <description>Message Router - Restful interface built for kafka</description>
index 347f625..2ec323e 100644 (file)
@@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException;
 import org.onap.dmaap.dmf.mr.backends.Consumer;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 
-
-
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 
@@ -83,6 +82,12 @@ public class Kafka011Consumer implements Consumer {
                state = Kafka011Consumer.State.OPENED;
                kConsumer = cc;
                fKafkaLiveLockAvoider = klla;
+               
+               String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               "consumer.timeout");
+               if (null != consumerTimeOut) {
+                       consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
+               }
                synchronized (kConsumer) {
                        kConsumer.subscribe(Arrays.asList(topic));
                }
@@ -147,7 +152,7 @@ public class Kafka011Consumer implements Consumer {
                ExecutorService service = Executors.newSingleThreadExecutor();
                service.execute(future);
                try {
-                       future.get(5, TimeUnit.SECONDS); // wait 1
+                       future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
                        // second
                } catch (TimeoutException ex) {
                        // timed out. Try to stop the code if possible.
@@ -370,6 +375,7 @@ public class Kafka011Consumer implements Consumer {
        private long offset;
        private Kafka011Consumer.State state;
        private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+       private int consumerPollTimeOut=5;
        private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
        private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
        
index 3dd3954..c9b51c7 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=1
-patch=13
+patch=14
 
 base_version=${major}.${minor}.${patch}