<modelVersion>4.0.0</modelVersion>
<groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
<artifactId>msgrtr</artifactId>
- <version>1.1.13-SNAPSHOT</version>
+ <version>1.1.14-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dmaap-messagerouter-msgrtr</name>
<description>Message Router - Restful interface built for kafka</description>
import org.onap.dmaap.dmf.mr.backends.Consumer;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-
-
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
state = Kafka011Consumer.State.OPENED;
kConsumer = cc;
fKafkaLiveLockAvoider = klla;
+
+ String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "consumer.timeout");
+ if (null != consumerTimeOut) {
+ consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
+ }
synchronized (kConsumer) {
kConsumer.subscribe(Arrays.asList(topic));
}
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(future);
try {
- future.get(5, TimeUnit.SECONDS); // wait 1
+ future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
// second
} catch (TimeoutException ex) {
// timed out. Try to stop the code if possible.
private long offset;
private Kafka011Consumer.State state;
private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+ private int consumerPollTimeOut=5;
private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;