DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
new file mode 100644 (file)
index 0000000..4d6c81c
--- /dev/null
@@ -0,0 +1,394 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+*  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *  
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.dmf.mr.backends.kafka;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.onap.dmaap.dmf.mr.backends.Consumer;
+import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.concurrent.*;
+
+/**
+ * A consumer instance that's created per-request. These are stateless so that
+ * clients can connect to this service as a proxy.
+ * 
+ * @author Ram
+ *
+ */
+public class Kafka011Consumer implements Consumer {
+       private enum State {
+               OPENED, CLOSED
+       }
+
+       
+       /**
+        * KafkaConsumer() is constructor. It has following 4 parameters:-
+        * 
+        * @param topic
+        * @param group
+        * @param id
+        * @param cc
+        * 
+        */
+
+       public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
+                       KafkaLiveLockAvoider2 klla) throws Exception {
+               fTopic = topic;
+               fGroup = group;
+               fId = id;
+               fCreateTimeMs = System.currentTimeMillis();
+               fLastTouch = fCreateTimeMs;
+               fPendingMsgs = new LinkedBlockingQueue<>();
+               fLogTag = fGroup + "(" + fId + ")/" + fTopic;
+               offset = 0;
+               state = State.OPENED;
+               kConsumer = cc;
+               fKafkaLiveLockAvoider = klla;
+
+               String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               "consumer.timeout");
+               if (StringUtils.isNotEmpty(consumerTimeOut)) {
+                       consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
+               }
+               synchronized (kConsumer) {
+                       kConsumer.subscribe(Arrays.asList(topic));
+               }
+       }
+
+       private Message makeMessage(final ConsumerRecord<String, String> msg) {
+               return new Message() {
+                       @Override
+                       public long getOffset() {
+                               offset = msg.offset();
+                               return offset;
+                       }
+
+                       @Override
+                       public String getMessage() {
+                               return new String(msg.value());
+                       }
+               };
+       }
+
+       @Override
+       public synchronized Message nextMessage() {
+
+               try {
+                       if (!fPendingMsgs.isEmpty()) {
+                               return makeMessage(fPendingMsgs.take());
+                       }
+               } catch (InterruptedException x) {
+                       log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
+                                       x);
+                       Thread.currentThread().interrupt();
+               }
+
+               Callable<Boolean> run = new Callable<Boolean>() {
+                       @Override
+                       public Boolean call() throws Exception {
+                               try {
+                                       ConsumerRecords<String, String> records;
+                                       synchronized (kConsumer) {
+                                               records = kConsumer.poll(500);
+                                       }
+                                       for (ConsumerRecord<String, String> record : records) {
+
+                                               fPendingMsgs.offer(record);
+                                       }
+
+                               } catch (KafkaException x) {
+                                       log.debug(fLogTag + ": KafkaException ", x);
+
+                               } catch (IllegalStateException | IllegalArgumentException x) {
+                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
+
+                               }
+
+
+                               return true;
+                       }
+               };
+
+               @SuppressWarnings({ "rawtypes", "unchecked" })
+               RunnableFuture future = new FutureTask(run);
+               ExecutorService service = Executors.newSingleThreadExecutor();
+               service.execute(future);
+               try {
+                       future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
+                       // second
+               } catch (TimeoutException ex) {
+               log.error("TimeoutException in in Kafka consumer ", ex);
+                       // timed out. Try to stop the code if possible.
+                       String apiNodeId = null;
+                       try {
+                               apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
+                       } catch (UnknownHostException e1) {
+                               log.error("unable to get the localhost address ", e1);
+                       }
+
+                       try {
+                               if (fKafkaLiveLockAvoider != null)
+                                       fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
+                       } catch (Exception e) {
+                               log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
+                       }
+
+                       forcePollOnConsumer();
+                       future.cancel(true);
+               } catch (Exception ex) {
+            log.error("Exception in in Kafka consumer ", ex);
+                       // timed out. Try to stop the code if possible.
+                       future.cancel(true);
+               }
+               service.shutdown();
+
+               return null;
+
+       }
+
+       /**
+        * getName() method returns string type value. returns 3 parameters in
+        * string:- fTopic,fGroup,fId
+        *
+        * @Override
+        */
+       public String getName() {
+               return fTopic + " : " + fGroup + " : " + fId;
+       }
+
+       /**
+        * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
+        * variable value
+        *
+        * @Override
+        *
+        */
+       public long getCreateTimeMs() {
+               return fCreateTimeMs;
+       }
+
+       public KafkaConsumer<String, String> getConsumer() {
+               return kConsumer;
+       }
+
+       /**
+        * getLastAccessMs() method returns long type value. returns fLastTouch
+        * variable value
+        *
+        * @Override
+        *
+        */
+       public long getLastAccessMs() {
+               return fLastTouch;
+       }
+
+       /**
+        * getOffset() method returns long type value. returns offset variable value
+        *
+        * @Override
+        *
+        */
+       public long getOffset() {
+               return offset;
+       }
+
+       /**
+        * commit offsets commitOffsets() method will be called on closed of
+        * KafkaConsumer.
+        *
+        * @Override
+        *
+        *
+        *                      public void commitOffsets() { if (getState() ==
+        *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
+        *           on closed KafkaConsumer " + getName()); return; }
+        *           fConnector.commitOffsets(); }
+        */
+
+       /**
+        * updating fLastTouch with current time in ms
+        */
+       public void touch() {
+               fLastTouch = System.currentTimeMillis();
+       }
+
+       /**
+        * getLastTouch() method returns long type value. returns fLastTouch
+        * variable value
+        *
+        */
+       public long getLastTouch() {
+               return fLastTouch;
+       }
+
+       /**
+        * setting the kafkaConsumer state to closed
+        */
+
+       public boolean close() {
+               if (getState() == State.CLOSED) {
+
+                       log.error("close() called on closed KafkaConsumer " + getName());
+                       return true;
+               }
+
+
+               boolean retVal = kafkaConnectorshuttask();
+               return retVal;
+
+       }
+
+       /* time out if the kafka shutdown fails for some reason */
+
+       private boolean kafkaConnectorshuttask() {
+               Callable<Boolean> run = new Callable<Boolean>() {
+                       @Override
+                       public Boolean call() throws Exception {
+
+                               try {
+
+                                       kConsumer.close();
+
+                               } catch (Exception e) {
+                                       log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+                                       throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
+
+                               }
+                               log.info("Kafka connection closure with in 15 seconds by a Executors task");
+
+                               return true;
+                       }
+               };
+
+               @SuppressWarnings({ "rawtypes", "unchecked" })
+               RunnableFuture future = new FutureTask(run);
+               ExecutorService service = Executors.newSingleThreadExecutor();
+               service.execute(future);
+               try {
+                  future.get(200, TimeUnit.SECONDS); // wait 1
+                       // second
+               } catch (TimeoutException ex) {
+                       // timed out. Try to stop the code if possible.
+                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
+                       future.cancel(true);
+                       setState(State.OPENED);
+               } catch (Exception ex) {
+                       // timed out. Try to stop the code if possible.
+                       log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
+                       future.cancel(true);
+                       setState(State.OPENED);
+                       return false;
+               }
+               service.shutdown();
+               setState(State.CLOSED);
+               return true;
+       }
+
+       public void forcePollOnConsumer() {
+               Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
+
+       }
+
+       /**
+        * getConsumerGroup() returns Consumer group
+        *
+        * @return
+        */
+       public String getConsumerGroup() {
+               return fGroup;
+       }
+
+       /**
+        * getConsumerId returns Consumer Id
+        *
+        * @return
+        */
+       public String getConsumerId() {
+               return fId;
+       }
+
+       /**
+        * getState returns kafkaconsumer state
+        *
+        * @return
+        */
+       private State getState() {
+               return this.state;
+       }
+
+       /**
+        * setState() sets the kafkaConsumer state
+        *
+        * @param state
+        */
+       private void setState(State state) {
+               this.state = state;
+       }
+
+
+       private final String fTopic;
+       private final String fGroup;
+       private final String fId;
+       private final String fLogTag;
+
+       private KafkaConsumer<String, String> kConsumer;
+       private long fCreateTimeMs;
+       private long fLastTouch;
+       private long offset;
+       private State state;
+       private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+       private int consumerPollTimeOut=5;
+       private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
+       private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
+
+       @Override
+       public void commitOffsets() {
+               if (getState() == State.CLOSED) {
+                       log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
+                       return;
+               }
+               kConsumer.commitSync();
+               
+
+       }
+
+       @Override
+       public void setOffset(long offsetval) {
+               offset = offsetval;
+       }
+
+       
+       public void setConsumerCache(KafkaConsumerCache cache) {
+       }
+
+       
+}