import com.att.dmf.mr.backends.Consumer;
import com.att.dmf.mr.constants.CambriaConstants;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
OPENED, CLOSED
}
- // @Autowired
- // KafkaLiveLockAvoider kafkaLiveLockAvoider;
+
/**
* KafkaConsumer() is constructor. It has following 4 parameters:-
*
records = kConsumer.poll(500);
}
for (ConsumerRecord<String, String> record : records) {
- // foundMsgs = true;
+
fPendingMsgs.offer(record);
}
}
- // return null;
+
return true;
}
};
/**
* setting the kafkaConsumer state to closed
*/
- // public synchronized boolean close() {
+
public boolean close() {
if (getState() == Kafka011Consumer.State.CLOSED) {
return true;
}
- // fConnector.shutdown();
+
boolean retVal = kafkaConnectorshuttask();
return retVal;
public Boolean call() throws Exception {
try {
- // System.out.println("attempt to delete " + kConsumer);
+
kConsumer.close();
} catch (Exception e) {
log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
- // return false;
+
}
log.info("Kafka connection closure with in 15 seconds by a Executors task");
this.state = state;
}
- // private ConsumerConnector fConnector;
+
private final String fTopic;
private final String fGroup;
private final String fId;
private final String fLogTag;
- // private final KafkaStream<byte[], byte[]> fStream;
+
private KafkaConsumer<String, String> kConsumer;
private long fCreateTimeMs;
private long fLastTouch;
private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
- //private ArrayList<Kafka011Consumer> fconsumerList;
+
@Override
public void commitOffsets() {
if (getState() == Kafka011Consumer.State.CLOSED) {
return;
}
kConsumer.commitSync();
- // fConsumer.close();
+
}
public void setConsumerCache(KafkaConsumerCache cache) {
}
- //@Override
- //public Message nextMessage(ArrayList<?> l) {
- // TODO Auto-generated method stub
- //return null;
- //}
+
}