import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.onap.dmaap.dmf.mr.backends.Consumer;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-
-
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
fId = id;
fCreateTimeMs = System.currentTimeMillis();
fLastTouch = fCreateTimeMs;
- fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+ fPendingMsgs = new LinkedBlockingQueue<>();
fLogTag = fGroup + "(" + fId + ")/" + fTopic;
offset = 0;
state = Kafka011Consumer.State.OPENED;
kConsumer = cc;
fKafkaLiveLockAvoider = klla;
+
+ String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "consumer.timeout");
+ if (StringUtils.isNotEmpty(consumerTimeOut)) {
+ consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
+ }
synchronized (kConsumer) {
kConsumer.subscribe(Arrays.asList(topic));
}
public synchronized Consumer.Message nextMessage() {
try {
- if (fPendingMsgs.size() > 0) {
+ if (fPendingMsgs.isEmpty()) {
return makeMessage(fPendingMsgs.take());
}
} catch (InterruptedException x) {
log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
x);
+ //Thread.currentThread().interrupt();
}
Callable<Boolean> run = new Callable<Boolean>() {
}
} catch (KafkaException x) {
- log.debug(fLogTag + ": KafkaException " + x.getMessage());
+ log.debug(fLogTag + ": KafkaException ", x);
} catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
- log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
- + x.getMessage());
+ log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
}
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(future);
try {
- future.get(5, TimeUnit.SECONDS); // wait 1
+ future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
// second
} catch (TimeoutException ex) {
+ log.error("TimeoutException in in Kafka consumer ", ex);
// timed out. Try to stop the code if possible.
String apiNodeId = null;
try {
apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
} catch (UnknownHostException e1) {
- // TODO Auto-generated catch block
- log.error("unable to get the localhost address");
+ log.error("unable to get the localhost address ", e1);
}
try {
if (fKafkaLiveLockAvoider != null)
fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
} catch (Exception e) {
- log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+ log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
}
forcePollOnConsumer();
future.cancel(true);
} catch (Exception ex) {
+ log.error("Exception in in Kafka consumer ", ex);
// timed out. Try to stop the code if possible.
future.cancel(true);
}
// second
} catch (TimeoutException ex) {
// timed out. Try to stop the code if possible.
- log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+ log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
future.cancel(true);
setState(Kafka011Consumer.State.OPENED);
} catch (Exception ex) {
// timed out. Try to stop the code if possible.
- log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
- + ex);
+ log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
future.cancel(true);
setState(Kafka011Consumer.State.OPENED);
return false;
private long offset;
private Kafka011Consumer.State state;
private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+ private int consumerPollTimeOut=5;
private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;