package com.att.dmf.mr.backends.kafka; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.KafkaException; import com.att.dmf.mr.backends.Consumer; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; /** * A consumer instance that's created per-request. These are stateless so that * clients can connect to this service as a proxy. * * @author peter * */ public class KafkaConsumer implements Consumer { private enum State { OPENED, CLOSED } /** * KafkaConsumer() is constructor. It has following 4 parameters:- * * @param topic * @param group * @param id * @param cc * */ public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception { fTopic = topic; fGroup = group; fId = id; // fConnector = cc; fCreateTimeMs = System.currentTimeMillis(); fLastTouch = fCreateTimeMs; fPendingMsgs = new LinkedBlockingQueue> (); fLogTag = fGroup + "(" + fId + ")/" + fTopic; offset = 0; state = KafkaConsumer.State.OPENED; // final Map topicCountMap = new HashMap(); // topicCountMap.put(fTopic, 1); // log.info(fLogTag +" kafka Consumer started at " // +System.currentTimeMillis()); // final Map>> consumerMap = // fConnector.createMessageStreams(topicCountMap); // final List> streams = // consumerMap.get(fTopic); kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop); // System.out.println("I am in Consumer APP " + topic + "-- " + // fConsumer); kConsumer.subscribe(Arrays.asList(topic)); log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); System.out.println("-----id " +id); try { ConsumerRecords records = kConsumer.poll(500); System.out.println("---" + records.count()); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); String t = record.value(); } }catch(Exception e){ System.out.println( e); } System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); kConsumer.commitSync(); // fConsumer.close(); /* * ConsumerRecords records = fConsumer.poll(500); * System.out.println("---" + records.count()); * * for (ConsumerRecord record : records) { * System.out.printf("offset = %d, key = %s, value = %s", * record.offset(), record.key(), record.value()); String t = * record.value(); * * } * * * fConsumer.commitSync(); fConsumer.close(); */ // fStream = streams.iterator().next(); } private Consumer.Message makeMessage ( final ConsumerRecord msg ) { return new Consumer.Message() { @Override public long getOffset () { return msg.offset (); } @Override public String getMessage () { return new String ( msg.value () ); } }; } @Override public synchronized Consumer.Message nextMessage () { try { if ( fPendingMsgs.size () > 0 ) { return makeMessage ( fPendingMsgs.take () ); } } catch ( InterruptedException x ) { log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x ); } try { boolean foundMsgs = false; System.out.println("entering into pollingWWWWWWWWWWWWWWWWW"); final ConsumerRecords records = kConsumer.poll ( 100 ); System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX...."); for ( ConsumerRecord record : records ) { foundMsgs = true; fPendingMsgs.offer ( record ); } } catch ( KafkaException x ) { log.debug ( fLogTag + ": KafkaException " + x.getMessage () ); } catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x ) { log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () ); } return null; } /** * getName() method returns string type value. returns 3 parameters in * string:- fTopic,fGroup,fId * * @Override */ public String getName() { return fTopic + " : " + fGroup + " : " + fId; } /** * getCreateTimeMs() method returns long type value. returns fCreateTimeMs * variable value * * @Override * */ public long getCreateTimeMs() { return fCreateTimeMs; } public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { return kConsumer; } /** * getLastAccessMs() method returns long type value. returns fLastTouch * variable value * * @Override * */ public long getLastAccessMs() { return fLastTouch; } /** * getOffset() method returns long type value. returns offset variable value * * @Override * */ public long getOffset() { return offset; } /** * commit offsets commitOffsets() method will be called on closed of * KafkaConsumer. * * @Override * * * public void commitOffsets() { if (getState() == * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called * on closed KafkaConsumer " + getName()); return; } * fConnector.commitOffsets(); } */ /** * updating fLastTouch with current time in ms */ public void touch() { fLastTouch = System.currentTimeMillis(); } /** * getLastTouch() method returns long type value. returns fLastTouch * variable value * */ public long getLastTouch() { return fLastTouch; } /** * setting the kafkaConsumer state to closed */ public synchronized boolean close() { if (getState() == KafkaConsumer.State.CLOSED) { log.warn("close() called on closed KafkaConsumer " + getName()); return true; } setState(KafkaConsumer.State.CLOSED); // fConnector.shutdown(); boolean retVal = kafkaConnectorshuttask(); return retVal; } /* time out if the kafka shutdown fails for some reason */ private boolean kafkaConnectorshuttask() { Callable run = new Callable() { @Override public Boolean call() throws Exception { // your code to be timed try { System.out.println("consumer closing....." + kConsumer); kConsumer.close(); } catch (Exception e) { log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e); } log.info("Kafka connection closure with in 15 seconds by a Executors task"); return true; } }; RunnableFuture future = new FutureTask(run); ExecutorService service = Executors.newSingleThreadExecutor(); service.execute(future); Boolean result = null; try { result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1 // second } catch (TimeoutException ex) { // timed out. Try to stop the code if possible. log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task"); future.cancel(true); } catch (Exception ex) { // timed out. Try to stop the code if possible. log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex); future.cancel(true); return false; } service.shutdown(); return true; } /** * getConsumerGroup() returns Consumer group * * @return */ public String getConsumerGroup() { return fGroup; } /** * getConsumerId returns Consumer Id * * @return */ public String getConsumerId() { return fId; } /** * getState returns kafkaconsumer state * * @return */ private KafkaConsumer.State getState() { return this.state; } /** * setState() sets the kafkaConsumer state * * @param state */ private void setState(KafkaConsumer.State state) { this.state = state; } // private ConsumerConnector fConnector; private final String fTopic; private final String fGroup; private final String fId; private final String fLogTag; // private final KafkaStream fStream; private final org.apache.kafka.clients.consumer.KafkaConsumer kConsumer; private long fCreateTimeMs; private long fLastTouch; private long offset; private KafkaConsumer.State state; private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class); private final LinkedBlockingQueue> fPendingMsgs; // private static final Logger log = // LoggerFactory.getLogger(KafkaConsumer.class); @Override public void commitOffsets() { if (getState() == KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); return; } kConsumer.commitSync(); // fConsumer.close(); } @Override public void setOffset(long offsetval) { // TODO Auto-generated method stub offset = offsetval; } }