fixing code smells
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaConsumerCache.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Enumeration;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map.Entry;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35
36 import org.I0Itec.zkclient.exception.ZkException;
37 import org.I0Itec.zkclient.exception.ZkInterruptedException;
38 import org.apache.curator.framework.CuratorFramework;
39 import org.apache.curator.framework.imps.CuratorFrameworkState;
40 import org.apache.curator.framework.recipes.cache.ChildData;
41 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
42 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
43 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
44 import org.apache.curator.framework.state.ConnectionState;
45 import org.apache.curator.framework.state.ConnectionStateListener;
46 import org.apache.curator.utils.EnsurePath;
47 import org.apache.curator.utils.ZKPaths;
48 import org.apache.http.annotation.NotThreadSafe;
49 import org.apache.zookeeper.KeeperException;
50 import org.apache.zookeeper.KeeperException.NoNodeException;
51 import org.springframework.beans.factory.annotation.Autowired;
52
53 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
54 import org.onap.dmaap.dmf.mr.backends.Consumer;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
57 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
58 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
59
60
61 import com.att.eelf.configuration.EELFLogger;
62 import com.att.eelf.configuration.EELFManager;
63 import com.att.nsa.metrics.CdmTimer;
64
65 /**
66  * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
67  *                must be
68  * @author peter
69  *
70  */
71 @NotThreadSafe
72 public class KafkaConsumerCache {
73
74         private static KafkaConsumerCache kafkaconscache = null;
75
76         public static KafkaConsumerCache getInstance() {
77                 if (kafkaconscache == null)
78                         kafkaconscache = new KafkaConsumerCache();
79
80                 return kafkaconscache;
81         }
82
83         private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
84         private static final int kDefault_ConsumerHandoverWaitMs = 500;
85
86         private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
87         private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
88
89         private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
90         private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
91
92         // kafka defaults to timing out a client after 6 seconds of inactivity, but
93         // it heartbeats even when the client isn't fetching. Here, we don't
94         // want to prematurely rebalance the consumer group. Assuming clients are
95         // hitting
96         // the server at least every 30 seconds, timing out after 2 minutes should
97         // be okay.
98         // FIXME: consider allowing the client to specify its expected call rate?
99         private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
100
101         // check for expirations pretty regularly
102         private static final long kDefault_SweepEverySeconds = 15;
103
104         private enum Status {
105                 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
106         }
107
108         
109         
110
111         @Autowired
112         private DMaaPErrorMessages errorMessages;
113
114         
115         /**
116          * User defined exception class for kafka consumer cache
117          * 
118          * @author nilanjana.maity
119          *
120          */
121         public class KafkaConsumerCacheException extends Exception {
122                 /**
123                  * To throw the exception
124                  * 
125                  * @param t
126                  */
127                 KafkaConsumerCacheException(Throwable t) {
128                         super(t);
129                 }
130
131                 /**
132                  * 
133                  * @param s
134                  */
135                 public KafkaConsumerCacheException(String s) {
136                         super(s);
137                 }
138
139                 private static final long serialVersionUID = 1L;
140         }
141
142         /**
143          * Creates a KafkaConsumerCache object. Before it is used, you must call
144          * startCache()
145          * 
146          * @param apiId
147          * @param s
148          * @param metrics
149          */
150         public KafkaConsumerCache() {
151
152                 String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
153                                 kSetting_ZkBasePath);
154                 if (null == strkSetting_ZkBasePath)
155                         strkSetting_ZkBasePath = kDefault_ZkBasePath;
156                 fBaseZkPath = strkSetting_ZkBasePath;
157
158                 fConsumers = new ConcurrentHashMap<>();
159                 fSweepScheduler = Executors.newScheduledThreadPool(1);
160
161                 curatorConsumerCache = null;
162
163                 status = Status.NOT_STARTED;
164                 // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
165                 // work around
166
167                 listener = new ConnectionStateListener() {
168                         public void stateChanged(CuratorFramework client, ConnectionState newState) {
169                                 if (newState == ConnectionState.LOST) {
170
171                                         log.info("ZooKeeper connection expired");
172                                         handleConnectionLoss();
173                                 } else if (newState == ConnectionState.READ_ONLY) {
174                                         log.warn("ZooKeeper connection set to read only mode.");
175                                 } else if (newState == ConnectionState.RECONNECTED) {
176                                         log.info("ZooKeeper connection re-established");
177                                         handleReconnection();
178                                 } else if (newState == ConnectionState.SUSPENDED) {
179                                         log.warn("ZooKeeper connection has been suspended.");
180                                         handleConnectionSuspended();
181                                 }
182                         }
183                 };
184         }
185
186         /**
187          * Start the cache service. This must be called before any get/put
188          * operations.
189          * 
190          * @param mode
191          *            DMAAP or cambria
192          * @param curator
193          * @throws IOException
194          * @throws KafkaConsumerCacheException
195          */
196         public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
197
198                 if (fApiId == null) {
199                         throw new IllegalArgumentException("API Node ID must be specified.");
200                 }
201
202                 try {
203
204                         if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
205                                 curator = getCuratorFramework(curator);
206                         }
207                         curator.getConnectionStateListenable().addListener(listener);
208                         setStatus(Status.CONNECTED);
209                         curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
210                         curatorConsumerCache.start();
211                         curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
212                                 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
213                                         switch (event.getType()) {
214                                         case CHILD_ADDED: {
215                                                 try {
216                                                         final String apiId = new String(event.getData().getData());
217                                                         final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
218
219                                                         log.info(apiId + " started consumer " + consumer);
220                                                 } catch (Exception ex) {
221                                                         log.info("#Error Occured during Adding child" + ex);
222                                                 }
223                                                 break;
224                                         }
225                                         case CHILD_UPDATED: {
226                                                 final String apiId = new String(event.getData().getData());
227                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
228
229                                                 if (fConsumers.containsKey(consumer)) {
230                                                         log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
231                                                                         + " but wont hand over");
232                                                         // Commented so that it dont give the connection
233                                                         // until the active node is running for this client
234                                                         // id.
235                                                         dropClaimedConsumer(consumer);
236                                                 }
237
238                                                 break;
239                                         }
240                                         case CHILD_REMOVED: {
241                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
242
243                                                 if (fConsumers.containsKey(consumer)) {
244                                                         log.info("Someone wanted consumer " + consumer
245                                                                         + " gone;  but not removing it from the cache");
246                                                         dropConsumer(consumer, false);
247                                                 }
248
249                                                 break;
250                                         }
251
252                                         default:
253                                                 break;
254                                         }
255                                 }
256                         });
257
258                         // initialize the ZK path
259                         EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
260                         ensurePath.ensure(curator.getZookeeperClient());
261
262                         
263                         
264                         long freq = kDefault_SweepEverySeconds;
265                         String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
266                                         kSetting_SweepEverySeconds);
267                         if (null != strkSetting_SweepEverySeconds) {
268                                 freq = Long.parseLong(strkSetting_SweepEverySeconds);
269                         }
270
271                         fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
272                         log.info("KafkaConsumerCache started");
273                         log.info("sweeping cached clients every " + freq + " seconds");
274                 } catch (ZkException e) {
275                         log.error("@@@@@@ ZK Exception occured for  " + e);
276                         throw new KafkaConsumerCacheException(e);
277                 } catch (Exception e) {
278                         log.error("@@@@@@  Exception occured for  " + e);
279                         throw new KafkaConsumerCacheException(e);
280                 }
281         }
282
283         /**
284          * Getting the curator oject to start the zookeeper connection estabished
285          * 
286          * @param curator
287          * @return curator object
288          */
289         public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
290                 if (curator.getState() == CuratorFrameworkState.LATENT) {
291                         curator.start();
292
293                         try {
294                                 curator.blockUntilConnected();
295                         } catch (InterruptedException e) {
296                                 log.error("error while setting curator framework :",e);
297                                 Thread.currentThread().interrupt();
298                         }
299                 }
300
301                 return curator;
302         }
303
304         /**
305          * Stop the cache service.
306          */
307         public void stopCache() {
308                 setStatus(Status.DISCONNECTED);
309
310                 final CuratorFramework curator = ConfigurationReader.getCurator();
311
312                 if (curator != null) {
313                         try {
314                                 curator.getConnectionStateListenable().removeListener(listener);
315                                 curatorConsumerCache.close();
316                                 log.info("Curator client closed");
317                         } catch (ZkInterruptedException e) {
318                                 log.warn("Curator client close interrupted: ", e);
319                         } catch (IOException e) {
320                                 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
321                         }
322
323                         curatorConsumerCache = null;
324                 }
325
326                 if (fSweepScheduler != null) {
327                         fSweepScheduler.shutdownNow();
328                         log.info("cache sweeper stopped");
329                 }
330
331                 if (fConsumers != null) {
332                         fConsumers.clear();
333                         fConsumers = null;
334                 }
335
336                 setStatus(Status.NOT_STARTED);
337
338                 log.info("Consumer cache service stopped");
339         }
340
341         /**
342          * Get a cached consumer by topic, group, and id, if it exists (and remains
343          * valid) In addition, this method waits for all other consumer caches in
344          * the cluster to release their ownership and delete their version of this
345          * consumer.
346          * 
347          * @param topic
348          * @param consumerGroupId
349          * @param clientId
350          * @return a consumer, or null
351          */
352         public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
353                         throws KafkaConsumerCacheException {
354                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
355                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
356
357                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
358                 final Kafka011Consumer kc = fConsumers.get(consumerKey);
359
360                 if (kc != null) {
361                         log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
362                         kc.touch();
363                         fMetrics.onKafkaConsumerCacheHit();
364                 } else {
365                         log.debug("Consumer cache miss for [" + consumerKey + "]");
366                         fMetrics.onKafkaConsumerCacheMiss();
367                 }
368
369                 return kc;
370         }
371
372         /**
373          * Get a cached consumer by topic, group, and id, if it exists (and remains
374          * valid) In addition, this method waits for all other consumer caches in
375          * the cluster to release their ownership and delete their version of this
376          * consumer.
377          * 
378          * @param topic
379          * @param consumerGroupId
380          * @param clientId
381          * @return a consumer, or null
382          */
383         public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
384                         throws KafkaConsumerCacheException {
385                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
386                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
387                 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
388                 
389                 
390                 Enumeration<String> strEnum = fConsumers.keys();
391                 String consumerLocalKey = null;
392                 while (strEnum.hasMoreElements()) {
393                         consumerLocalKey = strEnum.nextElement();
394
395                         if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
396
397                                 
398                                 
399                                 
400                                 kcl.add(fConsumers.get(consumerLocalKey));
401
402                         }
403                 }
404
405                 return kcl;
406         }
407
408         public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
409                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
410                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
411                 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
412                 
413                 Enumeration<String> strEnum = fConsumers.keys();
414                 String consumerLocalKey = null;
415                 while (strEnum.hasMoreElements()) {
416                         consumerLocalKey = strEnum.nextElement();
417
418                         if (consumerLocalKey.startsWith(group)) {
419
420                                 
421                                 kcl.add(fConsumers.get(consumerLocalKey));
422
423                         }
424                 }
425
426                 return kcl;
427         }
428
429         /**
430          * Put a consumer into the cache by topic, group and ID
431          * 
432          * @param topic
433          * @param consumerGroupId
434          * @param consumerId
435          * @param consumer
436          * @throws KafkaConsumerCacheException
437          */
438         public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
439                         throws KafkaConsumerCacheException {
440                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
441                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
442
443                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
444                 fConsumers.put(consumerKey, consumer);
445
446                 
447
448                 log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
449         }
450
451         public Collection<? extends Consumer> getConsumers() {
452                 return new LinkedList<>(fConsumers.values());
453         }
454
455         /**
456          * This method is to drop all the consumer
457          */
458         public void dropAllConsumers() {
459                 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
460                         dropConsumer(entry.getKey(), true);
461                 }
462
463                 // consumers should be empty here
464                 if (fConsumers.size() > 0) {
465                         log.warn("During dropAllConsumers, the consumer map is not empty.");
466                         fConsumers.clear();
467                 }
468         }
469
470         /**
471          * Drop a consumer from our cache due to a timeout
472          * 
473          * @param key
474          */
475         private void dropTimedOutConsumer(String key) {
476                 fMetrics.onKafkaConsumerTimeout();
477
478                 if (!fConsumers.containsKey(key)) {
479                         log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
480                         return;
481                 }
482
483                 // First, drop this consumer from our cache
484                 boolean isdrop = dropConsumer(key, true);
485                 if (!isdrop) {
486                         return;
487                 }
488                 final CuratorFramework curator = ConfigurationReader.getCurator();
489
490                 try {
491                         curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
492                         log.info(" ^ deleted " + fBaseZkPath + "/" + key);
493                 } catch (NoNodeException e) {
494                         log.warn("A consumer was deleted from " + fApiId
495                                         + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
496                 } catch (Exception e) {
497                         log.debug("Unexpected exception while deleting consumer: ", e);
498                         log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
499                 }
500
501                 try {
502                         int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
503                         String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
504                                         kSetting_ConsumerHandoverWaitMs);
505                         if (strkSetting_ConsumerHandoverWaitMs != null)
506                                 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
507                         Thread.sleep(consumerHandoverWaitMs);
508                 } catch (InterruptedException e) {
509                         log.error("InterruptedException in dropTimedOutConsumer",e);
510                         Thread.currentThread().interrupt();
511                 }
512                 log.info("Dropped " + key + " consumer due to timeout");
513         }
514
515         /**
516          * Drop a consumer from our cache due to another API node claiming it as
517          * their own.
518          * 
519          * @param key
520          */
521         private void dropClaimedConsumer(String key) {
522                 // if the consumer is still in our cache, it implies a claim.
523                 if (fConsumers.containsKey(key)) {
524                         fMetrics.onKafkaConsumerClaimed();
525                         log.info("Consumer [" + key + "] claimed by another node.");
526                 }
527                 log.info("^dropping claimed Kafka consumer " + key);
528                 dropConsumer(key, false);
529         }
530
531         /**
532          * Removes the consumer from the cache and closes its connection to the
533          * kafka broker(s).
534          * 
535          * @param key
536          * @param dueToTimeout
537          */
538         private boolean dropConsumer(String key, boolean dueToTimeout) {
539                 final Kafka011Consumer kc = fConsumers.get(key);
540                 log.info("closing Kafka consumer " + key + " object " + kc);
541                 if (kc != null) {
542                         
543                         if (kc.close()) {
544                                 fConsumers.remove(key);
545
546                         } else {
547                                 return false;
548                         }
549                 }
550                 return true;
551         }
552
553         // private final rrNvReadable fSettings;
554         private MetricsSet fMetrics;
555         private final String fBaseZkPath;
556         private final ScheduledExecutorService fSweepScheduler;
557         private String fApiId;
558
559         public void setfMetrics(final MetricsSet metrics) {
560                 this.fMetrics = metrics;
561         }
562
563         public void setfApiId(final String id) {
564                 this.fApiId = id;
565         }
566
567         private final ConnectionStateListener listener;
568
569         private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
570         private PathChildrenCache curatorConsumerCache;
571
572         private volatile Status status;
573
574         private void handleReconnection() {
575
576                 log.info("Reading current cache data from ZK and synchronizing local cache");
577                 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
578                 // Remove all the consumers in this API nodes cache that now belong to
579                 // other API nodes.
580                 for (ChildData cachedConsumer : cacheData) {
581                         final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
582                         final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
583                                         : "undefined";
584                         if (!fApiId.equals(owningApiId)) {
585                                 fConsumers.remove(consumerId); // Commented to avoid removing
586                                 // the value cache hashmap but the lock still exists.
587                                 // This is not considered in kafka consumer Factory
588                                 log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
589                                                 + " removing " + consumerId);
590                         }
591                 }
592
593                 setStatus(Status.CONNECTED);
594         }
595
596         private void handleConnectionSuspended() {
597                 log.info("Suspending cache until ZK connection is re-established");
598
599                 setStatus(Status.SUSPENDED);
600         }
601
602         private void handleConnectionLoss() {
603                 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
604
605                 setStatus(Status.DISCONNECTED);
606
607                 closeAllCachedConsumers();
608                 fConsumers.clear();
609         }
610
611         private void closeAllCachedConsumers() {
612                 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
613                         try {
614                                 entry.getValue().close();
615                         } catch (Exception e) {
616                                 log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
617                         }
618                 }
619         }
620
621         private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
622                 return topic + "::" + consumerGroupId + "::" + clientId;
623         }
624
625         /**
626          * This method is to get a lock
627          * 
628          * @param topic
629          * @param consumerGroupId
630          * @param consumerId
631          * @throws KafkaConsumerCacheException
632          */
633         public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
634                         throws KafkaConsumerCacheException {
635                 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
636                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
637
638                 try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
639                         final String consumerPath = fBaseZkPath + "/" + consumerKey;
640                         log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
641                         final CuratorFramework curator = ConfigurationReader.getCurator();
642
643                         try {
644                                 curator.setData().forPath(consumerPath, fApiId.getBytes());
645                         } catch (KeeperException.NoNodeException e) {
646                             log.info("KeeperException.NoNodeException occured", e);
647                                 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
648                         }
649                         log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
650                         timer.end();
651                 } catch (Exception e) {
652                         log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
653                         throw new KafkaConsumerCacheException(e);
654                 }
655
656                 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
657
658                 try {
659                         int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
660                         String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
661                                         kSetting_ConsumerHandoverWaitMs);
662                         if (strkSetting_ConsumerHandoverWaitMs != null)
663                                 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
664                         Thread.sleep(consumerHandoverWaitMs);
665                 } catch (InterruptedException e) {
666                         log.error("InterruptedException in signalOwnership",e);
667                         //Thread.currentThread().interrupt();
668                 }
669         }
670
671         public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
672                 return null;
673         }
674
675         public void sweep() {
676                 final LinkedList<String> removals = new LinkedList<String>();
677                 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
678                 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
679                                 kSetting_TouchEveryMs);
680                 if (null != strkSetting_TouchEveryMs) {
681                         mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
682                 }
683
684                 
685                 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
686
687                 for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
688                         final long lastTouchMs = e.getValue().getLastTouch();
689                         log.debug("consumer #####1" + e.getKey() + "    " + lastTouchMs + " < " + oldestAllowedTouchMs);
690
691                         if (lastTouchMs < oldestAllowedTouchMs) {
692                                 log.info("consumer " + e.getKey() + " has expired");
693                                 removals.add(e.getKey());
694                         }
695                 }
696
697                 for (String key : removals) {
698                         dropTimedOutConsumer(key);
699                 }
700         }
701
702         /**
703          * Creating a thread to run the sweep method
704          * 
705          * @author nilanjana.maity
706          *
707          */
708         private class sweeper implements Runnable {
709                 /**
710                  * run method
711                  */
712                 public void run() {
713                         sweep();
714                 }
715         }
716
717         /**
718          * This method is to drop consumer
719          * 
720          * @param topic
721          * @param consumerGroup
722          * @param clientId
723          */
724         public void dropConsumer(String topic, String consumerGroup, String clientId) {
725                 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
726         }
727
728         private Status getStatus() {
729                 return this.status;
730         }
731
732         private void setStatus(Status status) {
733                 this.status = status;
734         }
735
736         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
737         
738 }