X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbackends%2Fkafka%2FKafkaConsumer.txt;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbackends%2Fkafka%2FKafkaConsumer.txt;h=dd6259f94b99ef846e6353eb9ae6fd478134c774;hb=f052f758c29fa92c21578514202268f3b0430a1a;hp=0000000000000000000000000000000000000000;hpb=388410f85e44de41ef9b340f44c0b4b1b0af59b6;p=dmaap%2Fmessagerouter%2Fmessageservice.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt new file mode 100644 index 0000000..dd6259f --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt @@ -0,0 +1,386 @@ +package com.att.dmf.mr.backends.kafka; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; + +import com.att.dmf.mr.backends.Consumer; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * A consumer instance that's created per-request. These are stateless so that + * clients can connect to this service as a proxy. + * + * @author peter + * + */ +public class KafkaConsumer implements Consumer { + private enum State { + OPENED, CLOSED + } + + /** + * KafkaConsumer() is constructor. It has following 4 parameters:- + * + * @param topic + * @param group + * @param id + * @param cc + * + */ + + public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception { + fTopic = topic; + fGroup = group; + fId = id; + // fConnector = cc; + + fCreateTimeMs = System.currentTimeMillis(); + fLastTouch = fCreateTimeMs; + fPendingMsgs = new LinkedBlockingQueue> (); + fLogTag = fGroup + "(" + fId + ")/" + fTopic; + offset = 0; + + state = KafkaConsumer.State.OPENED; + + // final Map topicCountMap = new HashMap(); + // topicCountMap.put(fTopic, 1); + // log.info(fLogTag +" kafka Consumer started at " + // +System.currentTimeMillis()); + // final Map>> consumerMap = + // fConnector.createMessageStreams(topicCountMap); + // final List> streams = + // consumerMap.get(fTopic); + + kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop); + // System.out.println("I am in Consumer APP " + topic + "-- " + + // fConsumer); + kConsumer.subscribe(Arrays.asList(topic)); + log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); + System.out.println("-----id " +id); + + + try { ConsumerRecords records = + kConsumer.poll(500); System.out.println("---" + + records.count()); + + for (ConsumerRecord record : records) { + System.out.printf("offset = %d, key = %s, value = %s", + record.offset(), record.key(), record.value()); String t = + record.value(); + + } + }catch(Exception e){ + System.out.println( e); + } + System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); + kConsumer.commitSync(); + // fConsumer.close(); + + + /* + * ConsumerRecords records = fConsumer.poll(500); + * System.out.println("---" + records.count()); + * + * for (ConsumerRecord record : records) { + * System.out.printf("offset = %d, key = %s, value = %s", + * record.offset(), record.key(), record.value()); String t = + * record.value(); + * + * } + * + * + * fConsumer.commitSync(); fConsumer.close(); + */ + + // fStream = streams.iterator().next(); + } + + + + private Consumer.Message makeMessage ( final ConsumerRecord msg ) + { + return new Consumer.Message() + { + @Override + public long getOffset () + { + return msg.offset (); + } + + @Override + public String getMessage () + { + return new String ( msg.value () ); + } + }; + } + + @Override + public synchronized Consumer.Message nextMessage () + { + + try + { + if ( fPendingMsgs.size () > 0 ) + { + return makeMessage ( fPendingMsgs.take () ); + } + } + catch ( InterruptedException x ) + { + log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x ); + } + + + try + { + boolean foundMsgs = false; + System.out.println("entering into pollingWWWWWWWWWWWWWWWWW"); + final ConsumerRecords records = kConsumer.poll ( 100 ); + System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX...."); + for ( ConsumerRecord record : records ) + { + foundMsgs = true; + fPendingMsgs.offer ( record ); + } + + } + catch ( KafkaException x ) + { + log.debug ( fLogTag + ": KafkaException " + x.getMessage () ); + + } + catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x ) + { + log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () ); + + } + + return null; + } + + + + /** + * getName() method returns string type value. returns 3 parameters in + * string:- fTopic,fGroup,fId + * + * @Override + */ + public String getName() { + return fTopic + " : " + fGroup + " : " + fId; + } + + /** + * getCreateTimeMs() method returns long type value. returns fCreateTimeMs + * variable value + * + * @Override + * + */ + public long getCreateTimeMs() { + return fCreateTimeMs; + } + + public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { + return kConsumer; + } + + /** + * getLastAccessMs() method returns long type value. returns fLastTouch + * variable value + * + * @Override + * + */ + public long getLastAccessMs() { + return fLastTouch; + } + + + + /** + * getOffset() method returns long type value. returns offset variable value + * + * @Override + * + */ + public long getOffset() { + return offset; + } + + /** + * commit offsets commitOffsets() method will be called on closed of + * KafkaConsumer. + * + * @Override + * + * + * public void commitOffsets() { if (getState() == + * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called + * on closed KafkaConsumer " + getName()); return; } + * fConnector.commitOffsets(); } + */ + + /** + * updating fLastTouch with current time in ms + */ + public void touch() { + fLastTouch = System.currentTimeMillis(); + } + + /** + * getLastTouch() method returns long type value. returns fLastTouch + * variable value + * + */ + public long getLastTouch() { + return fLastTouch; + } + + /** + * setting the kafkaConsumer state to closed + */ + public synchronized boolean close() { + + if (getState() == KafkaConsumer.State.CLOSED) { + + log.warn("close() called on closed KafkaConsumer " + getName()); + return true; + } + + setState(KafkaConsumer.State.CLOSED); + // fConnector.shutdown(); + boolean retVal = kafkaConnectorshuttask(); + return retVal; + + } + + /* time out if the kafka shutdown fails for some reason */ + + private boolean kafkaConnectorshuttask() { + Callable run = new Callable() { + @Override + public Boolean call() throws Exception { + // your code to be timed + try { + System.out.println("consumer closing....." + kConsumer); + kConsumer.close(); + } catch (Exception e) { + log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e); + } + log.info("Kafka connection closure with in 15 seconds by a Executors task"); + return true; + } + }; + + RunnableFuture future = new FutureTask(run); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(future); + Boolean result = null; + try { + result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1 + // second + } catch (TimeoutException ex) { + // timed out. Try to stop the code if possible. + log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task"); + future.cancel(true); + } catch (Exception ex) { + // timed out. Try to stop the code if possible. + log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex); + future.cancel(true); + return false; + } + service.shutdown(); + return true; + } + + /** + * getConsumerGroup() returns Consumer group + * + * @return + */ + public String getConsumerGroup() { + return fGroup; + } + + /** + * getConsumerId returns Consumer Id + * + * @return + */ + public String getConsumerId() { + return fId; + } + + /** + * getState returns kafkaconsumer state + * + * @return + */ + private KafkaConsumer.State getState() { + return this.state; + } + + /** + * setState() sets the kafkaConsumer state + * + * @param state + */ + private void setState(KafkaConsumer.State state) { + this.state = state; + } + + // private ConsumerConnector fConnector; + private final String fTopic; + private final String fGroup; + private final String fId; + private final String fLogTag; + // private final KafkaStream fStream; + private final org.apache.kafka.clients.consumer.KafkaConsumer kConsumer; + private long fCreateTimeMs; + private long fLastTouch; + private long offset; + private KafkaConsumer.State state; + private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class); + private final LinkedBlockingQueue> fPendingMsgs; + // private static final Logger log = + // LoggerFactory.getLogger(KafkaConsumer.class); + + @Override + public void commitOffsets() { + if (getState() == KafkaConsumer.State.CLOSED) { + log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); + return; + } + kConsumer.commitSync(); + // fConsumer.close(); + + } + + + + @Override + public void setOffset(long offsetval) { + // TODO Auto-generated method stub + offset = offsetval; + } +}