DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaConsumerCache.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import com.att.aft.dme2.internal.springframework.beans.factory.annotation.Autowired;
25 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import com.att.nsa.metrics.CdmTimer;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Enumeration;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Map.Entry;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.TimeUnit;
40 import org.I0Itec.zkclient.exception.ZkException;
41 import org.I0Itec.zkclient.exception.ZkInterruptedException;
42 import org.apache.curator.framework.CuratorFramework;
43 import org.apache.curator.framework.imps.CuratorFrameworkState;
44 import org.apache.curator.framework.recipes.cache.ChildData;
45 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
46 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
47 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
48 import org.apache.curator.framework.state.ConnectionState;
49 import org.apache.curator.framework.state.ConnectionStateListener;
50 import org.apache.curator.utils.EnsurePath;
51 import org.apache.curator.utils.ZKPaths;
52 import org.apache.http.annotation.NotThreadSafe;
53 import org.apache.zookeeper.KeeperException.NoNodeException;
54 import org.onap.dmaap.dmf.mr.backends.Consumer;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
57 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
58 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
59
60 /**
61  * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
62  *                must be
63  * @author peter
64  *
65  */
66 @NotThreadSafe
67 public class KafkaConsumerCache {
68
69         private static KafkaConsumerCache kafkaconscache = null;
70
71         public static KafkaConsumerCache getInstance() {
72                 if (kafkaconscache == null)
73                         kafkaconscache = new KafkaConsumerCache();
74
75                 return kafkaconscache;
76         }
77
78         private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
79         private static final int kDefault_ConsumerHandoverWaitMs = 500;
80
81         private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
82         private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
83
84         private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
85         private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
86
87         // kafka defaults to timing out a client after 6 seconds of inactivity, but
88         // it heartbeats even when the client isn't fetching. Here, we don't
89         // want to prematurely rebalance the consumer group. Assuming clients are
90         // hitting
91         // the server at least every 30 seconds, timing out after 2 minutes should
92         // be okay.
93         // FIXME: consider allowing the client to specify its expected call rate?
94         private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
95
96         // check for expirations pretty regularly
97         private static final long kDefault_SweepEverySeconds = 15;
98
99         private enum Status {
100                 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
101         }
102
103         
104         
105
106         @Autowired
107         private DMaaPErrorMessages errorMessages;
108
109         
110         /**
111          * User defined exception class for kafka consumer cache
112          * 
113          * @author nilanjana.maity
114          *
115          */
116         public class KafkaConsumerCacheException extends Exception {
117                 /**
118                  * To throw the exception
119                  * 
120                  * @param t
121                  */
122                 KafkaConsumerCacheException(Throwable t) {
123                         super(t);
124                 }
125
126                 /**
127                  * 
128                  * @param s
129                  */
130                 public KafkaConsumerCacheException(String s) {
131                         super(s);
132                 }
133
134                 private static final long serialVersionUID = 1L;
135         }
136
137         /**
138          * Creates a KafkaConsumerCache object. Before it is used, you must call
139          * startCache()
140          *
141          */
142         public KafkaConsumerCache() {
143
144                 String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
145                                 kSetting_ZkBasePath);
146                 if (null == strkSetting_ZkBasePath)
147                         strkSetting_ZkBasePath = kDefault_ZkBasePath;
148                 fBaseZkPath = strkSetting_ZkBasePath;
149
150                 fConsumers = new ConcurrentHashMap<>();
151                 fSweepScheduler = Executors.newScheduledThreadPool(1);
152
153                 curatorConsumerCache = null;
154
155                 status = Status.NOT_STARTED;
156                 // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
157                 // work around
158
159                 listener = new ConnectionStateListener() {
160                         public void stateChanged(CuratorFramework client, ConnectionState newState) {
161                                 if (newState == ConnectionState.LOST) {
162
163                                         log.info("ZooKeeper connection expired");
164                                         handleConnectionLoss();
165                                 } else if (newState == ConnectionState.READ_ONLY) {
166                                         log.warn("ZooKeeper connection set to read only mode.");
167                                 } else if (newState == ConnectionState.RECONNECTED) {
168                                         log.info("ZooKeeper connection re-established");
169                                         handleReconnection();
170                                 } else if (newState == ConnectionState.SUSPENDED) {
171                                         log.warn("ZooKeeper connection has been suspended.");
172                                         handleConnectionSuspended();
173                                 }
174                         }
175                 };
176         }
177
178         /**
179          * Start the cache service. This must be called before any get/put
180          * operations.
181          * 
182          * @param mode
183          *            DMAAP or cambria
184          * @param curator
185          * @throws IOException
186          * @throws KafkaConsumerCacheException
187          */
188         public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
189
190                 if (fApiId == null) {
191                         throw new IllegalArgumentException("API Node ID must be specified.");
192                 }
193
194                 try {
195
196                         if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
197                                 curator = getCuratorFramework(curator);
198                         }
199                         curator.getConnectionStateListenable().addListener(listener);
200                         setStatus(Status.CONNECTED);
201                         curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
202                         curatorConsumerCache.start();
203                         curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
204                                 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
205                                         switch (event.getType()) {
206                                         case CHILD_ADDED: {
207                                                 try {
208                                                         final String apiId = new String(event.getData().getData());
209                                                         final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
210
211                                                         log.info(apiId + " started consumer " + consumer);
212                                                 } catch (Exception ex) {
213                                                         log.info("#Error Occured during Adding child" + ex);
214                                                 }
215                                                 break;
216                                         }
217                                         case CHILD_UPDATED: {
218                                                 final String apiId = new String(event.getData().getData());
219                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
220
221                                                 if (fConsumers.containsKey(consumer)) {
222                                                         log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
223                                                                         + " but wont hand over");
224                                                         // Commented so that it dont give the connection
225                                                         // until the active node is running for this client
226                                                         // id.
227                                                         dropClaimedConsumer(consumer);
228                                                 }
229
230                                                 break;
231                                         }
232                                         case CHILD_REMOVED: {
233                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
234
235                                                 if (fConsumers.containsKey(consumer)) {
236                                                         log.info("Someone wanted consumer " + consumer
237                                                                         + " gone;  but not removing it from the cache");
238                                                         dropConsumer(consumer, false);
239                                                 }
240
241                                                 break;
242                                         }
243
244                                         default:
245                                                 break;
246                                         }
247                                 }
248                         });
249
250                         // initialize the ZK path
251                         EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
252                         ensurePath.ensure(curator.getZookeeperClient());
253
254                         
255                         
256                         long freq = kDefault_SweepEverySeconds;
257                         String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
258                                         kSetting_SweepEverySeconds);
259                         if (null != strkSetting_SweepEverySeconds) {
260                                 freq = Long.parseLong(strkSetting_SweepEverySeconds);
261                         }
262
263                         fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
264                         log.info("KafkaConsumerCache started");
265                         log.info("sweeping cached clients every " + freq + " seconds");
266                 } catch (ZkException e) {
267                         log.error("@@@@@@ ZK Exception occured for  " + e);
268                         throw new KafkaConsumerCacheException(e);
269                 } catch (Exception e) {
270                         log.error("@@@@@@  Exception occured for  " + e);
271                         throw new KafkaConsumerCacheException(e);
272                 }
273         }
274
275         /**
276          * Getting the curator oject to start the zookeeper connection estabished
277          * 
278          * @param curator
279          * @return curator object
280          */
281         public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
282                 if (curator.getState() == CuratorFrameworkState.LATENT) {
283                         curator.start();
284
285                         try {
286                                 curator.blockUntilConnected();
287                         } catch (InterruptedException e) {
288                                 log.error("error while setting curator framework :",e);
289                                 Thread.currentThread().interrupt();
290                         }
291                 }
292
293                 return curator;
294         }
295
296         /**
297          * Stop the cache service.
298          */
299         public void stopCache() {
300                 setStatus(Status.DISCONNECTED);
301
302                 final CuratorFramework curator = ConfigurationReader.getCurator();
303
304                 if (curator != null) {
305                         try {
306                                 curator.getConnectionStateListenable().removeListener(listener);
307                                 curatorConsumerCache.close();
308                                 log.info("Curator client closed");
309                         } catch (ZkInterruptedException e) {
310                                 log.warn("Curator client close interrupted: ", e);
311                         } catch (IOException e) {
312                                 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
313                         }
314
315                         curatorConsumerCache = null;
316                 }
317
318                 if (fSweepScheduler != null) {
319                         fSweepScheduler.shutdownNow();
320                         log.info("cache sweeper stopped");
321                 }
322
323                 if (fConsumers != null) {
324                         fConsumers.clear();
325                         fConsumers = null;
326                 }
327
328                 setStatus(Status.NOT_STARTED);
329
330                 log.info("Consumer cache service stopped");
331         }
332
333         /**
334          * Get a cached consumer by topic, group, and id, if it exists (and remains
335          * valid) In addition, this method waits for all other consumer caches in
336          * the cluster to release their ownership and delete their version of this
337          * consumer.
338          * 
339          * @param topic
340          * @param consumerGroupId
341          * @param clientId
342          * @return a consumer, or null
343          */
344         public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
345                         throws KafkaConsumerCacheException {
346                 if (getStatus() != Status.CONNECTED)
347                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
348
349                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
350                 final Kafka011Consumer kc = fConsumers.get(consumerKey);
351
352                 if (kc != null) {
353                         log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
354                         kc.touch();
355                         fMetrics.onKafkaConsumerCacheHit();
356                 } else {
357                         log.debug("Consumer cache miss for [" + consumerKey + "]");
358                         fMetrics.onKafkaConsumerCacheMiss();
359                 }
360
361                 return kc;
362         }
363
364         /**
365          * Get a cached consumer by topic, group, and id, if it exists (and remains
366          * valid) In addition, this method waits for all other consumer caches in
367          * the cluster to release their ownership and delete their version of this
368          * consumer.
369          *
370          * @param topicgroup
371          * @param clientId
372          * @return a consumer, or null
373          */
374         public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
375                         throws KafkaConsumerCacheException {
376                 if (getStatus() != Status.CONNECTED)
377                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
378                 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
379
380
381                 Enumeration<String> strEnum = fConsumers.keys();
382                 String consumerLocalKey = null;
383                 while (strEnum.hasMoreElements()) {
384                         consumerLocalKey = strEnum.nextElement();
385
386                         if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
387
388
389
390
391                                 kcl.add(fConsumers.get(consumerLocalKey));
392
393                         }
394                 }
395
396                 return kcl;
397         }
398
399         public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
400                 if (getStatus() != Status.CONNECTED)
401                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
402                 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
403
404                 Enumeration<String> strEnum = fConsumers.keys();
405                 String consumerLocalKey = null;
406                 while (strEnum.hasMoreElements()) {
407                         consumerLocalKey = strEnum.nextElement();
408
409                         if (consumerLocalKey.startsWith(group)) {
410
411
412                                 kcl.add(fConsumers.get(consumerLocalKey));
413
414                         }
415                 }
416
417                 return kcl;
418         }
419
420         /**
421          * Put a consumer into the cache by topic, group and ID
422          *
423          * @param topic
424          * @param consumerGroupId
425          * @param consumerId
426          * @param consumer
427          * @throws KafkaConsumerCacheException
428          */
429         public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
430                         throws KafkaConsumerCacheException {
431                 if (getStatus() != Status.CONNECTED)
432                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
433
434                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
435                 fConsumers.put(consumerKey, consumer);
436
437
438
439                 log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
440         }
441
442         public Collection<? extends Consumer> getConsumers() {
443                 return new LinkedList<>(fConsumers.values());
444         }
445
446         /**
447          * This method is to drop all the consumer
448          */
449         public void dropAllConsumers() {
450                 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
451                         dropConsumer(entry.getKey(), true);
452                 }
453
454                 // consumers should be empty here
455                 if (fConsumers.size() > 0) {
456                         log.warn("During dropAllConsumers, the consumer map is not empty.");
457                         fConsumers.clear();
458                 }
459         }
460
461         /**
462          * Drop a consumer from our cache due to a timeout
463          *
464          * @param key
465          */
466         private void dropTimedOutConsumer(String key) {
467                 fMetrics.onKafkaConsumerTimeout();
468
469                 if (!fConsumers.containsKey(key)) {
470                         log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
471                         return;
472                 }
473
474                 // First, drop this consumer from our cache
475                 boolean isdrop = dropConsumer(key, true);
476                 if (!isdrop) {
477                         return;
478                 }
479                 final CuratorFramework curator = ConfigurationReader.getCurator();
480
481                 try {
482                         curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
483                         log.info(" ^ deleted " + fBaseZkPath + "/" + key);
484                 } catch (NoNodeException e) {
485                         log.warn("A consumer was deleted from " + fApiId
486                                         + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
487                 } catch (Exception e) {
488                         log.debug("Unexpected exception while deleting consumer: ", e);
489                         log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
490                 }
491
492                 try {
493                         int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
494                         String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
495                                         kSetting_ConsumerHandoverWaitMs);
496                         if (strkSetting_ConsumerHandoverWaitMs != null)
497                                 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
498                         Thread.sleep(consumerHandoverWaitMs);
499                 } catch (InterruptedException e) {
500                         log.error("InterruptedException in dropTimedOutConsumer",e);
501                         Thread.currentThread().interrupt();
502                 }
503                 log.info("Dropped " + key + " consumer due to timeout");
504         }
505
506         /**
507          * Drop a consumer from our cache due to another API node claiming it as
508          * their own.
509          *
510          * @param key
511          */
512         private void dropClaimedConsumer(String key) {
513                 // if the consumer is still in our cache, it implies a claim.
514                 if (fConsumers.containsKey(key)) {
515                         fMetrics.onKafkaConsumerClaimed();
516                         log.info("Consumer [" + key + "] claimed by another node.");
517                 }
518                 log.info("^dropping claimed Kafka consumer " + key);
519                 dropConsumer(key, false);
520         }
521
522         /**
523          * Removes the consumer from the cache and closes its connection to the
524          * kafka broker(s).
525          *
526          * @param key
527          * @param dueToTimeout
528          */
529         private boolean dropConsumer(String key, boolean dueToTimeout) {
530                 final Kafka011Consumer kc = fConsumers.get(key);
531                 log.info("closing Kafka consumer " + key + " object " + kc);
532                 if (kc != null) {
533
534                         if (kc.close()) {
535                                 fConsumers.remove(key);
536
537                         } else {
538                                 return false;
539                         }
540                 }
541                 return true;
542         }
543
544         // private final rrNvReadable fSettings;
545         private MetricsSet fMetrics;
546         private final String fBaseZkPath;
547         private final ScheduledExecutorService fSweepScheduler;
548         private String fApiId;
549
550         public void setfMetrics(final MetricsSet metrics) {
551                 this.fMetrics = metrics;
552         }
553
554         public void setfApiId(final String id) {
555                 this.fApiId = id;
556         }
557
558         private final ConnectionStateListener listener;
559
560         private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
561         private PathChildrenCache curatorConsumerCache;
562
563         private volatile Status status;
564
565         private void handleReconnection() {
566
567                 log.info("Reading current cache data from ZK and synchronizing local cache");
568                 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
569                 // Remove all the consumers in this API nodes cache that now belong to
570                 // other API nodes.
571                 for (ChildData cachedConsumer : cacheData) {
572                         final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
573                         final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
574                                         : "undefined";
575                         if (!fApiId.equals(owningApiId)) {
576                                 fConsumers.remove(consumerId); // Commented to avoid removing
577                                 // the value cache hashmap but the lock still exists.
578                                 // This is not considered in kafka consumer Factory
579                                 log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
580                                                 + " removing " + consumerId);
581                         }
582                 }
583
584                 setStatus(Status.CONNECTED);
585         }
586
587         private void handleConnectionSuspended() {
588                 log.info("Suspending cache until ZK connection is re-established");
589
590                 setStatus(Status.SUSPENDED);
591         }
592
593         private void handleConnectionLoss() {
594                 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
595
596                 setStatus(Status.DISCONNECTED);
597
598                 closeAllCachedConsumers();
599                 fConsumers.clear();
600         }
601
602         private void closeAllCachedConsumers() {
603                 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
604                         try {
605                                 entry.getValue().close();
606                         } catch (Exception e) {
607                                 log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
608                         }
609                 }
610         }
611
612         private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
613                 return topic + "::" + consumerGroupId + "::" + clientId;
614         }
615
616         /**
617          * This method is to get a lock
618          *
619          * @param topic
620          * @param consumerGroupId
621          * @param consumerId
622          * @throws KafkaConsumerCacheException
623          */
624         public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
625                         throws KafkaConsumerCacheException {
626                 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
627                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
628
629                 try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
630                         final String consumerPath = fBaseZkPath + "/" + consumerKey;
631                         log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
632                         final CuratorFramework curator = ConfigurationReader.getCurator();
633
634                         try {
635                                 curator.setData().forPath(consumerPath, fApiId.getBytes());
636                         } catch (NoNodeException e) {
637                             log.info("KeeperException.NoNodeException occured", e);
638                                 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
639                         }
640                         log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
641                         timer.end();
642                 } catch (Exception e) {
643                         log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
644                         throw new KafkaConsumerCacheException(e);
645                 }
646
647                 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
648
649                 try {
650                         int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
651                         String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
652                                         kSetting_ConsumerHandoverWaitMs);
653                         if (strkSetting_ConsumerHandoverWaitMs != null)
654                                 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
655                         Thread.sleep(consumerHandoverWaitMs);
656                 } catch (InterruptedException e) {
657                         log.error("InterruptedException in signalOwnership",e);
658                         //Thread.currentThread().interrupt();
659                 }
660         }
661
662         public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
663                 return null;
664         }
665
666         public void sweep() {
667                 final LinkedList<String> removals = new LinkedList<String>();
668                 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
669                 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
670                                 kSetting_TouchEveryMs);
671                 if (null != strkSetting_TouchEveryMs) {
672                         mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
673                 }
674
675                 
676                 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
677
678                 for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
679                         final long lastTouchMs = e.getValue().getLastTouch();
680                         log.debug("consumer #####1" + e.getKey() + "    " + lastTouchMs + " < " + oldestAllowedTouchMs);
681
682                         if (lastTouchMs < oldestAllowedTouchMs) {
683                                 log.info("consumer " + e.getKey() + " has expired");
684                                 removals.add(e.getKey());
685                         }
686                 }
687
688                 for (String key : removals) {
689                         dropTimedOutConsumer(key);
690                 }
691         }
692
693         /**
694          * Creating a thread to run the sweep method
695          * 
696          * @author nilanjana.maity
697          *
698          */
699         private class sweeper implements Runnable {
700                 /**
701                  * run method
702                  */
703                 public void run() {
704                         sweep();
705                 }
706         }
707
708         /**
709          * This method is to drop consumer
710          * 
711          * @param topic
712          * @param consumerGroup
713          * @param clientId
714          */
715         public void dropConsumer(String topic, String consumerGroup, String clientId) {
716                 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
717         }
718
719         private Status getStatus() {
720                 return this.status;
721         }
722
723         private void setStatus(Status status) {
724                 this.status = status;
725         }
726
727         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
728         
729 }