DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaConsumerCache.java
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
new file mode 100644 (file)
index 0000000..ac68a11
--- /dev/null
@@ -0,0 +1,729 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+*  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *  
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.dmf.mr.backends.kafka;
+
+import com.att.aft.dme2.internal.springframework.beans.factory.annotation.Autowired;
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.metrics.CdmTimer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.onap.dmaap.dmf.mr.backends.Consumer;
+import org.onap.dmaap.dmf.mr.backends.MetricsSet;
+import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
+import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
+
+/**
+ * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
+ *                must be
+ * @author peter
+ *
+ */
+@NotThreadSafe
+public class KafkaConsumerCache {
+
+       private static KafkaConsumerCache kafkaconscache = null;
+
+       public static KafkaConsumerCache getInstance() {
+               if (kafkaconscache == null)
+                       kafkaconscache = new KafkaConsumerCache();
+
+               return kafkaconscache;
+       }
+
+       private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
+       private static final int kDefault_ConsumerHandoverWaitMs = 500;
+
+       private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
+       private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
+
+       private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
+       private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
+
+       // kafka defaults to timing out a client after 6 seconds of inactivity, but
+       // it heartbeats even when the client isn't fetching. Here, we don't
+       // want to prematurely rebalance the consumer group. Assuming clients are
+       // hitting
+       // the server at least every 30 seconds, timing out after 2 minutes should
+       // be okay.
+       // FIXME: consider allowing the client to specify its expected call rate?
+       private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
+
+       // check for expirations pretty regularly
+       private static final long kDefault_SweepEverySeconds = 15;
+
+       private enum Status {
+               NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
+       }
+
+       
+       
+
+       @Autowired
+       private DMaaPErrorMessages errorMessages;
+
+       
+       /**
+        * User defined exception class for kafka consumer cache
+        * 
+        * @author nilanjana.maity
+        *
+        */
+       public class KafkaConsumerCacheException extends Exception {
+               /**
+                * To throw the exception
+                * 
+                * @param t
+                */
+               KafkaConsumerCacheException(Throwable t) {
+                       super(t);
+               }
+
+               /**
+                * 
+                * @param s
+                */
+               public KafkaConsumerCacheException(String s) {
+                       super(s);
+               }
+
+               private static final long serialVersionUID = 1L;
+       }
+
+       /**
+        * Creates a KafkaConsumerCache object. Before it is used, you must call
+        * startCache()
+        *
+        */
+       public KafkaConsumerCache() {
+
+               String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               kSetting_ZkBasePath);
+               if (null == strkSetting_ZkBasePath)
+                       strkSetting_ZkBasePath = kDefault_ZkBasePath;
+               fBaseZkPath = strkSetting_ZkBasePath;
+
+               fConsumers = new ConcurrentHashMap<>();
+               fSweepScheduler = Executors.newScheduledThreadPool(1);
+
+               curatorConsumerCache = null;
+
+               status = Status.NOT_STARTED;
+               // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
+               // work around
+
+               listener = new ConnectionStateListener() {
+                       public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                               if (newState == ConnectionState.LOST) {
+
+                                       log.info("ZooKeeper connection expired");
+                                       handleConnectionLoss();
+                               } else if (newState == ConnectionState.READ_ONLY) {
+                                       log.warn("ZooKeeper connection set to read only mode.");
+                               } else if (newState == ConnectionState.RECONNECTED) {
+                                       log.info("ZooKeeper connection re-established");
+                                       handleReconnection();
+                               } else if (newState == ConnectionState.SUSPENDED) {
+                                       log.warn("ZooKeeper connection has been suspended.");
+                                       handleConnectionSuspended();
+                               }
+                       }
+               };
+       }
+
+       /**
+        * Start the cache service. This must be called before any get/put
+        * operations.
+        * 
+        * @param mode
+        *            DMAAP or cambria
+        * @param curator
+        * @throws IOException
+        * @throws KafkaConsumerCacheException
+        */
+       public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
+
+               if (fApiId == null) {
+                       throw new IllegalArgumentException("API Node ID must be specified.");
+               }
+
+               try {
+
+                       if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
+                               curator = getCuratorFramework(curator);
+                       }
+                       curator.getConnectionStateListenable().addListener(listener);
+                       setStatus(Status.CONNECTED);
+                       curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
+                       curatorConsumerCache.start();
+                       curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
+                               public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+                                       switch (event.getType()) {
+                                       case CHILD_ADDED: {
+                                               try {
+                                                       final String apiId = new String(event.getData().getData());
+                                                       final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+                                                       log.info(apiId + " started consumer " + consumer);
+                                               } catch (Exception ex) {
+                                                       log.info("#Error Occured during Adding child" + ex);
+                                               }
+                                               break;
+                                       }
+                                       case CHILD_UPDATED: {
+                                               final String apiId = new String(event.getData().getData());
+                                               final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+                                               if (fConsumers.containsKey(consumer)) {
+                                                       log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
+                                                                       + " but wont hand over");
+                                                       // Commented so that it dont give the connection
+                                                       // until the active node is running for this client
+                                                       // id.
+                                                       dropClaimedConsumer(consumer);
+                                               }
+
+                                               break;
+                                       }
+                                       case CHILD_REMOVED: {
+                                               final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
+
+                                               if (fConsumers.containsKey(consumer)) {
+                                                       log.info("Someone wanted consumer " + consumer
+                                                                       + " gone;  but not removing it from the cache");
+                                                       dropConsumer(consumer, false);
+                                               }
+
+                                               break;
+                                       }
+
+                                       default:
+                                               break;
+                                       }
+                               }
+                       });
+
+                       // initialize the ZK path
+                       EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
+                       ensurePath.ensure(curator.getZookeeperClient());
+
+                       
+                       
+                       long freq = kDefault_SweepEverySeconds;
+                       String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                                       kSetting_SweepEverySeconds);
+                       if (null != strkSetting_SweepEverySeconds) {
+                               freq = Long.parseLong(strkSetting_SweepEverySeconds);
+                       }
+
+                       fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
+                       log.info("KafkaConsumerCache started");
+                       log.info("sweeping cached clients every " + freq + " seconds");
+               } catch (ZkException e) {
+                       log.error("@@@@@@ ZK Exception occured for  " + e);
+                       throw new KafkaConsumerCacheException(e);
+               } catch (Exception e) {
+                       log.error("@@@@@@  Exception occured for  " + e);
+                       throw new KafkaConsumerCacheException(e);
+               }
+       }
+
+       /**
+        * Getting the curator oject to start the zookeeper connection estabished
+        * 
+        * @param curator
+        * @return curator object
+        */
+       public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
+               if (curator.getState() == CuratorFrameworkState.LATENT) {
+                       curator.start();
+
+                       try {
+                               curator.blockUntilConnected();
+                       } catch (InterruptedException e) {
+                               log.error("error while setting curator framework :",e);
+                               Thread.currentThread().interrupt();
+                       }
+               }
+
+               return curator;
+       }
+
+       /**
+        * Stop the cache service.
+        */
+       public void stopCache() {
+               setStatus(Status.DISCONNECTED);
+
+               final CuratorFramework curator = ConfigurationReader.getCurator();
+
+               if (curator != null) {
+                       try {
+                               curator.getConnectionStateListenable().removeListener(listener);
+                               curatorConsumerCache.close();
+                               log.info("Curator client closed");
+                       } catch (ZkInterruptedException e) {
+                               log.warn("Curator client close interrupted: ", e);
+                       } catch (IOException e) {
+                               log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
+                       }
+
+                       curatorConsumerCache = null;
+               }
+
+               if (fSweepScheduler != null) {
+                       fSweepScheduler.shutdownNow();
+                       log.info("cache sweeper stopped");
+               }
+
+               if (fConsumers != null) {
+                       fConsumers.clear();
+                       fConsumers = null;
+               }
+
+               setStatus(Status.NOT_STARTED);
+
+               log.info("Consumer cache service stopped");
+       }
+
+       /**
+        * Get a cached consumer by topic, group, and id, if it exists (and remains
+        * valid) In addition, this method waits for all other consumer caches in
+        * the cluster to release their ownership and delete their version of this
+        * consumer.
+        * 
+        * @param topic
+        * @param consumerGroupId
+        * @param clientId
+        * @return a consumer, or null
+        */
+       public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
+                       throws KafkaConsumerCacheException {
+               if (getStatus() != Status.CONNECTED)
+                       throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+               final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
+               final Kafka011Consumer kc = fConsumers.get(consumerKey);
+
+               if (kc != null) {
+                       log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
+                       kc.touch();
+                       fMetrics.onKafkaConsumerCacheHit();
+               } else {
+                       log.debug("Consumer cache miss for [" + consumerKey + "]");
+                       fMetrics.onKafkaConsumerCacheMiss();
+               }
+
+               return kc;
+       }
+
+       /**
+        * Get a cached consumer by topic, group, and id, if it exists (and remains
+        * valid) In addition, this method waits for all other consumer caches in
+        * the cluster to release their ownership and delete their version of this
+        * consumer.
+        *
+        * @param topicgroup
+        * @param clientId
+        * @return a consumer, or null
+        */
+       public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
+                       throws KafkaConsumerCacheException {
+               if (getStatus() != Status.CONNECTED)
+                       throw new KafkaConsumerCacheException("The cache service is unavailable.");
+               ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
+
+
+               Enumeration<String> strEnum = fConsumers.keys();
+               String consumerLocalKey = null;
+               while (strEnum.hasMoreElements()) {
+                       consumerLocalKey = strEnum.nextElement();
+
+                       if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
+
+
+
+
+                               kcl.add(fConsumers.get(consumerLocalKey));
+
+                       }
+               }
+
+               return kcl;
+       }
+
+       public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
+               if (getStatus() != Status.CONNECTED)
+                       throw new KafkaConsumerCacheException("The cache service is unavailable.");
+               ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
+
+               Enumeration<String> strEnum = fConsumers.keys();
+               String consumerLocalKey = null;
+               while (strEnum.hasMoreElements()) {
+                       consumerLocalKey = strEnum.nextElement();
+
+                       if (consumerLocalKey.startsWith(group)) {
+
+
+                               kcl.add(fConsumers.get(consumerLocalKey));
+
+                       }
+               }
+
+               return kcl;
+       }
+
+       /**
+        * Put a consumer into the cache by topic, group and ID
+        *
+        * @param topic
+        * @param consumerGroupId
+        * @param consumerId
+        * @param consumer
+        * @throws KafkaConsumerCacheException
+        */
+       public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
+                       throws KafkaConsumerCacheException {
+               if (getStatus() != Status.CONNECTED)
+                       throw new KafkaConsumerCacheException("The cache service is unavailable.");
+
+               final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+               fConsumers.put(consumerKey, consumer);
+
+
+
+               log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
+       }
+
+       public Collection<? extends Consumer> getConsumers() {
+               return new LinkedList<>(fConsumers.values());
+       }
+
+       /**
+        * This method is to drop all the consumer
+        */
+       public void dropAllConsumers() {
+               for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+                       dropConsumer(entry.getKey(), true);
+               }
+
+               // consumers should be empty here
+               if (fConsumers.size() > 0) {
+                       log.warn("During dropAllConsumers, the consumer map is not empty.");
+                       fConsumers.clear();
+               }
+       }
+
+       /**
+        * Drop a consumer from our cache due to a timeout
+        *
+        * @param key
+        */
+       private void dropTimedOutConsumer(String key) {
+               fMetrics.onKafkaConsumerTimeout();
+
+               if (!fConsumers.containsKey(key)) {
+                       log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
+                       return;
+               }
+
+               // First, drop this consumer from our cache
+               boolean isdrop = dropConsumer(key, true);
+               if (!isdrop) {
+                       return;
+               }
+               final CuratorFramework curator = ConfigurationReader.getCurator();
+
+               try {
+                       curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
+                       log.info(" ^ deleted " + fBaseZkPath + "/" + key);
+               } catch (NoNodeException e) {
+                       log.warn("A consumer was deleted from " + fApiId
+                                       + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
+               } catch (Exception e) {
+                       log.debug("Unexpected exception while deleting consumer: ", e);
+                       log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
+               }
+
+               try {
+                       int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+                       String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                                       kSetting_ConsumerHandoverWaitMs);
+                       if (strkSetting_ConsumerHandoverWaitMs != null)
+                               consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+                       Thread.sleep(consumerHandoverWaitMs);
+               } catch (InterruptedException e) {
+                       log.error("InterruptedException in dropTimedOutConsumer",e);
+                       Thread.currentThread().interrupt();
+               }
+               log.info("Dropped " + key + " consumer due to timeout");
+       }
+
+       /**
+        * Drop a consumer from our cache due to another API node claiming it as
+        * their own.
+        *
+        * @param key
+        */
+       private void dropClaimedConsumer(String key) {
+               // if the consumer is still in our cache, it implies a claim.
+               if (fConsumers.containsKey(key)) {
+                       fMetrics.onKafkaConsumerClaimed();
+                       log.info("Consumer [" + key + "] claimed by another node.");
+               }
+               log.info("^dropping claimed Kafka consumer " + key);
+               dropConsumer(key, false);
+       }
+
+       /**
+        * Removes the consumer from the cache and closes its connection to the
+        * kafka broker(s).
+        *
+        * @param key
+        * @param dueToTimeout
+        */
+       private boolean dropConsumer(String key, boolean dueToTimeout) {
+               final Kafka011Consumer kc = fConsumers.get(key);
+               log.info("closing Kafka consumer " + key + " object " + kc);
+               if (kc != null) {
+
+                       if (kc.close()) {
+                               fConsumers.remove(key);
+
+                       } else {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       // private final rrNvReadable fSettings;
+       private MetricsSet fMetrics;
+       private final String fBaseZkPath;
+       private final ScheduledExecutorService fSweepScheduler;
+       private String fApiId;
+
+       public void setfMetrics(final MetricsSet metrics) {
+               this.fMetrics = metrics;
+       }
+
+       public void setfApiId(final String id) {
+               this.fApiId = id;
+       }
+
+       private final ConnectionStateListener listener;
+
+       private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
+       private PathChildrenCache curatorConsumerCache;
+
+       private volatile Status status;
+
+       private void handleReconnection() {
+
+               log.info("Reading current cache data from ZK and synchronizing local cache");
+               final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
+               // Remove all the consumers in this API nodes cache that now belong to
+               // other API nodes.
+               for (ChildData cachedConsumer : cacheData) {
+                       final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
+                       final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
+                                       : "undefined";
+                       if (!fApiId.equals(owningApiId)) {
+                               fConsumers.remove(consumerId); // Commented to avoid removing
+                               // the value cache hashmap but the lock still exists.
+                               // This is not considered in kafka consumer Factory
+                               log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
+                                               + " removing " + consumerId);
+                       }
+               }
+
+               setStatus(Status.CONNECTED);
+       }
+
+       private void handleConnectionSuspended() {
+               log.info("Suspending cache until ZK connection is re-established");
+
+               setStatus(Status.SUSPENDED);
+       }
+
+       private void handleConnectionLoss() {
+               log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
+
+               setStatus(Status.DISCONNECTED);
+
+               closeAllCachedConsumers();
+               fConsumers.clear();
+       }
+
+       private void closeAllCachedConsumers() {
+               for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
+                       try {
+                               entry.getValue().close();
+                       } catch (Exception e) {
+                               log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
+                       }
+               }
+       }
+
+       private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
+               return topic + "::" + consumerGroupId + "::" + clientId;
+       }
+
+       /**
+        * This method is to get a lock
+        *
+        * @param topic
+        * @param consumerGroupId
+        * @param consumerId
+        * @throws KafkaConsumerCacheException
+        */
+       public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
+                       throws KafkaConsumerCacheException {
+               // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
+               final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
+
+               try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
+                       final String consumerPath = fBaseZkPath + "/" + consumerKey;
+                       log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
+                       final CuratorFramework curator = ConfigurationReader.getCurator();
+
+                       try {
+                               curator.setData().forPath(consumerPath, fApiId.getBytes());
+                       } catch (NoNodeException e) {
+                           log.info("KeeperException.NoNodeException occured", e);
+                               curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
+                       }
+                       log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
+                       timer.end();
+               } catch (Exception e) {
+                       log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
+                       throw new KafkaConsumerCacheException(e);
+               }
+
+               log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
+
+               try {
+                       int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
+                       String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                                       kSetting_ConsumerHandoverWaitMs);
+                       if (strkSetting_ConsumerHandoverWaitMs != null)
+                               consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
+                       Thread.sleep(consumerHandoverWaitMs);
+               } catch (InterruptedException e) {
+                       log.error("InterruptedException in signalOwnership",e);
+                       //Thread.currentThread().interrupt();
+               }
+       }
+
+       public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
+               return null;
+       }
+
+       public void sweep() {
+               final LinkedList<String> removals = new LinkedList<String>();
+               long mustTouchEveryMs = kDefault_MustTouchEveryMs;
+               String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               kSetting_TouchEveryMs);
+               if (null != strkSetting_TouchEveryMs) {
+                       mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
+               }
+
+               
+               final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
+
+               for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
+                       final long lastTouchMs = e.getValue().getLastTouch();
+                       log.debug("consumer #####1" + e.getKey() + "    " + lastTouchMs + " < " + oldestAllowedTouchMs);
+
+                       if (lastTouchMs < oldestAllowedTouchMs) {
+                               log.info("consumer " + e.getKey() + " has expired");
+                               removals.add(e.getKey());
+                       }
+               }
+
+               for (String key : removals) {
+                       dropTimedOutConsumer(key);
+               }
+       }
+
+       /**
+        * Creating a thread to run the sweep method
+        * 
+        * @author nilanjana.maity
+        *
+        */
+       private class sweeper implements Runnable {
+               /**
+                * run method
+                */
+               public void run() {
+                       sweep();
+               }
+       }
+
+       /**
+        * This method is to drop consumer
+        * 
+        * @param topic
+        * @param consumerGroup
+        * @param clientId
+        */
+       public void dropConsumer(String topic, String consumerGroup, String clientId) {
+               dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
+       }
+
+       private Status getStatus() {
+               return this.status;
+       }
+
+       private void setStatus(Status status) {
+               this.status = status;
+       }
+
+       private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
+       
+}
\ No newline at end of file