DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaConsumer.txt
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumer.txt
new file mode 100644 (file)
index 0000000..dd6259f
--- /dev/null
@@ -0,0 +1,386 @@
+package com.att.dmf.mr.backends.kafka;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+
+import com.att.dmf.mr.backends.Consumer;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ * 
+ * @author peter
+ *
+ */
+public class KafkaConsumer implements Consumer {
+       private enum State {
+               OPENED, CLOSED
+       }
+
+       /**
+        * KafkaConsumer() is constructor. It has following 4 parameters:-
+        * 
+        * @param topic
+        * @param group
+        * @param id
+        * @param cc
+        * 
+        */
+
+       public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
+               fTopic = topic;
+               fGroup = group;
+               fId = id;
+               // fConnector = cc;
+
+               fCreateTimeMs = System.currentTimeMillis();
+               fLastTouch = fCreateTimeMs;
+               fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
+               fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+               offset = 0;
+
+               state = KafkaConsumer.State.OPENED;
+
+               // final Map<String, Integer> topicCountMap = new HashMap<String,
+               // Integer>();
+               // topicCountMap.put(fTopic, 1);
+               // log.info(fLogTag +" kafka Consumer started at "
+               // +System.currentTimeMillis());
+               // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+               // fConnector.createMessageStreams(topicCountMap);
+               // final List<KafkaStream<byte[], byte[]>> streams =
+               // consumerMap.get(fTopic);
+
+               kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
+               // System.out.println("I am in Consumer APP " + topic + "-- " +
+               // fConsumer);
+               kConsumer.subscribe(Arrays.asList(topic));
+               log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+       System.out.println("-----id " +id);
+       
+               
+         try { ConsumerRecords<String, String> records =
+                                                  kConsumer.poll(500); System.out.println("---" +
+                                                   records.count());
+                                         
+                                                   for (ConsumerRecord<String, String> record : records) {
+                                                   System.out.printf("offset = %d, key = %s, value = %s",
+                                                   record.offset(), record.key(), record.value()); String t =
+                                                   record.value();
+                                         
+                                                   }
+                 }catch(Exception e){
+                         System.out.println( e);
+                 }
+                       System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
+                       kConsumer.commitSync();
+               //  fConsumer.close();  
+       
+
+               /*
+                * ConsumerRecords<String, String> records = fConsumer.poll(500);
+                * System.out.println("---" + records.count());
+                * 
+                * for (ConsumerRecord<String, String> record : records) {
+                * System.out.printf("offset = %d, key = %s, value = %s",
+                * record.offset(), record.key(), record.value()); String t =
+                * record.value();
+                * 
+                * }
+                * 
+                * 
+                * fConsumer.commitSync(); fConsumer.close();
+                */
+
+               // fStream = streams.iterator().next();
+       }
+       
+       
+       
+       private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
+       {
+               return new Consumer.Message()
+               {
+                       @Override
+                       public long getOffset ()
+                       {
+                               return msg.offset ();
+                       }
+                       
+                       @Override
+                       public String getMessage ()
+                       {
+                               return new String ( msg.value () );
+                       }
+               };
+       }
+       
+       @Override
+       public synchronized Consumer.Message nextMessage ()
+       {
+               
+               try
+               {
+                       if ( fPendingMsgs.size () > 0 )
+                       {
+                               return makeMessage ( fPendingMsgs.take () );
+                       }
+               }
+               catch ( InterruptedException x )
+               {
+                       log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
+               }
+               
+               
+                       try
+                       {
+                               boolean foundMsgs = false;
+                               System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
+                               final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
+                               System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
+                               for ( ConsumerRecord<String,String> record : records )
+                               {
+                                       foundMsgs = true;
+                                       fPendingMsgs.offer ( record );
+                               }
+                       
+                       }
+                       catch ( KafkaException x )
+                       {
+                               log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
+                               
+                       }
+                       catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
+                       {
+                               log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
+                       
+                       }
+                               
+               return null;
+       }
+       
+       
+
+       /**
+        * getName() method returns string type value. returns 3 parameters in
+        * string:- fTopic,fGroup,fId
+        * 
+        * @Override
+        */
+       public String getName() {
+               return fTopic + " : " + fGroup + " : " + fId;
+       }
+
+       /**
+        * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+        * variable value
+        * 
+        * @Override
+        * 
+        */
+       public long getCreateTimeMs() {
+               return fCreateTimeMs;
+       }
+
+       public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
+               return kConsumer;
+       }
+
+       /**
+        * getLastAccessMs() method returns long type value. returns fLastTouch
+        * variable value
+        * 
+        * @Override
+        * 
+        */
+       public long getLastAccessMs() {
+               return fLastTouch;
+       }
+
+       
+
+       /**
+        * getOffset() method returns long type value. returns offset variable value
+        * 
+        * @Override
+        * 
+        */
+       public long getOffset() {
+               return offset;
+       }
+
+       /**
+        * commit offsets commitOffsets() method will be called on closed of
+        * KafkaConsumer.
+        * 
+        * @Override
+        * 
+        *
+        *                      public void commitOffsets() { if (getState() ==
+        *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+        *           on closed KafkaConsumer " + getName()); return; }
+        *           fConnector.commitOffsets(); }
+        */
+
+       /**
+        * updating fLastTouch with current time in ms
+        */
+       public void touch() {
+               fLastTouch = System.currentTimeMillis();
+       }
+
+       /**
+        * getLastTouch() method returns long type value. returns fLastTouch
+        * variable value
+        * 
+        */
+       public long getLastTouch() {
+               return fLastTouch;
+       }
+
+       /**
+        * setting the kafkaConsumer state to closed
+        */
+       public synchronized boolean close() {
+
+               if (getState() == KafkaConsumer.State.CLOSED) {
+
+                       log.warn("close() called on closed KafkaConsumer " + getName());
+                       return true;
+               }
+
+               setState(KafkaConsumer.State.CLOSED);
+               // fConnector.shutdown();
+               boolean retVal = kafkaConnectorshuttask();
+               return retVal;
+
+       }
+
+       /* time out if the kafka shutdown fails for some reason */
+
+       private boolean kafkaConnectorshuttask() {
+               Callable<Boolean> run = new Callable<Boolean>() {
+                       @Override
+                       public Boolean call() throws Exception {
+                               // your code to be timed
+                               try {
+                               System.out.println("consumer closing....." + kConsumer);
+                                       kConsumer.close();
+                               } catch (Exception e) {
+                                       log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+                               }
+                               log.info("Kafka connection closure with in 15 seconds by a Executors task");
+                               return true;
+                       }
+               };
+
+               RunnableFuture future = new FutureTask(run);
+               ExecutorService service = Executors.newSingleThreadExecutor();
+               service.execute(future);
+               Boolean result = null;
+               try {
+                       result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
+                                                                                                                                       // second
+               } catch (TimeoutException ex) {
+                       // timed out. Try to stop the code if possible.
+                       log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
+                       future.cancel(true);
+               } catch (Exception ex) {
+                       // timed out. Try to stop the code if possible.
+                       log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
+                       future.cancel(true);
+                       return false;
+               }
+               service.shutdown();
+               return true;
+       }
+
+       /**
+        * getConsumerGroup() returns Consumer group
+        * 
+        * @return
+        */
+       public String getConsumerGroup() {
+               return fGroup;
+       }
+
+       /**
+        * getConsumerId returns Consumer Id
+        * 
+        * @return
+        */
+       public String getConsumerId() {
+               return fId;
+       }
+
+       /**
+        * getState returns kafkaconsumer state
+        * 
+        * @return
+        */
+       private KafkaConsumer.State getState() {
+               return this.state;
+       }
+
+       /**
+        * setState() sets the kafkaConsumer state
+        * 
+        * @param state
+        */
+       private void setState(KafkaConsumer.State state) {
+               this.state = state;
+       }
+
+       // private ConsumerConnector fConnector;
+       private final String fTopic;
+       private final String fGroup;
+       private final String fId;
+       private final String fLogTag;
+       // private final KafkaStream<byte[], byte[]> fStream;
+       private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
+       private long fCreateTimeMs;
+       private long fLastTouch;
+       private long offset;
+       private KafkaConsumer.State state;
+       private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
+       private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
+       // private static final Logger log =
+       // LoggerFactory.getLogger(KafkaConsumer.class);
+
+       @Override
+       public void commitOffsets() {
+               if (getState() == KafkaConsumer.State.CLOSED) {
+                       log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+                       return;
+               }
+               kConsumer.commitSync();
+               // fConsumer.close();
+
+       }
+
+
+
+       @Override
+       public void setOffset(long offsetval) {
+               // TODO Auto-generated method stub
+               offset = offsetval;
+       }
+}