Fix sonar issues in dmaap-messagerouter-msgrtr
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaConsumerCache.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import java.io.IOException;
25 import java.net.InetAddress;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Enumeration;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map.Entry;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.TimeUnit;
36
37 import javax.annotation.Resource;
38
39 import org.I0Itec.zkclient.exception.ZkException;
40 import org.I0Itec.zkclient.exception.ZkInterruptedException;
41 import org.apache.curator.framework.CuratorFramework;
42 import org.apache.curator.framework.imps.CuratorFrameworkState;
43 import org.apache.curator.framework.recipes.cache.ChildData;
44 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
45 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
46 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
47 import org.apache.curator.framework.state.ConnectionState;
48 import org.apache.curator.framework.state.ConnectionStateListener;
49 import org.apache.curator.utils.EnsurePath;
50 import org.apache.curator.utils.ZKPaths;
51 import org.apache.http.annotation.NotThreadSafe;
52 import org.apache.zookeeper.KeeperException;
53 import org.apache.zookeeper.KeeperException.NoNodeException;
54 import org.springframework.beans.factory.annotation.Autowired;
55 import org.springframework.beans.factory.annotation.Qualifier;
56 import org.springframework.context.annotation.ComponentScan;
57
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import org.onap.dmaap.dmf.mr.backends.Consumer;
60 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
61 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
63 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
64
65
66 import com.att.eelf.configuration.EELFLogger;
67 import com.att.eelf.configuration.EELFManager;
68 import com.att.nsa.metrics.CdmTimer;
69
70 /**
71  * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
72  *                must be
73  * @author peter
74  *
75  */
76 @NotThreadSafe
77 public class KafkaConsumerCache {
78
79         private static KafkaConsumerCache kafkaconscache = null;
80
81         public static KafkaConsumerCache getInstance() {
82                 if (kafkaconscache == null)
83                         kafkaconscache = new KafkaConsumerCache();
84
85                 return kafkaconscache;
86         }
87
88         private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
89         private static final int kDefault_ConsumerHandoverWaitMs = 500;
90
91         private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
92         private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
93
94         private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
95         private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
96
97         // kafka defaults to timing out a client after 6 seconds of inactivity, but
98         // it heartbeats even when the client isn't fetching. Here, we don't
99         // want to prematurely rebalance the consumer group. Assuming clients are
100         // hitting
101         // the server at least every 30 seconds, timing out after 2 minutes should
102         // be okay.
103         // FIXME: consider allowing the client to specify its expected call rate?
104         private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
105
106         // check for expirations pretty regularly
107         private static final long kDefault_SweepEverySeconds = 15;
108
109         private enum Status {
110                 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
111         }
112
113         
114         
115
116         @Autowired
117         private DMaaPErrorMessages errorMessages;
118
119         
120         /**
121          * User defined exception class for kafka consumer cache
122          * 
123          * @author nilanjana.maity
124          *
125          */
126         public class KafkaConsumerCacheException extends Exception {
127                 /**
128                  * To throw the exception
129                  * 
130                  * @param t
131                  */
132                 KafkaConsumerCacheException(Throwable t) {
133                         super(t);
134                 }
135
136                 /**
137                  * 
138                  * @param s
139                  */
140                 public KafkaConsumerCacheException(String s) {
141                         super(s);
142                 }
143
144                 private static final long serialVersionUID = 1L;
145         }
146
147         /**
148          * Creates a KafkaConsumerCache object. Before it is used, you must call
149          * startCache()
150          * 
151          * @param apiId
152          * @param s
153          * @param metrics
154          */
155         public KafkaConsumerCache() {
156
157                 String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
158                                 kSetting_ZkBasePath);
159                 if (null == strkSetting_ZkBasePath)
160                         strkSetting_ZkBasePath = kDefault_ZkBasePath;
161                 fBaseZkPath = strkSetting_ZkBasePath;
162
163                 fConsumers = new ConcurrentHashMap<>();
164                 fSweepScheduler = Executors.newScheduledThreadPool(1);
165
166                 curatorConsumerCache = null;
167
168                 status = Status.NOT_STARTED;
169                 // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
170                 // work around
171
172                 listener = new ConnectionStateListener() {
173                         public void stateChanged(CuratorFramework client, ConnectionState newState) {
174                                 if (newState == ConnectionState.LOST) {
175
176                                         log.info("ZooKeeper connection expired");
177                                         handleConnectionLoss();
178                                 } else if (newState == ConnectionState.READ_ONLY) {
179                                         log.warn("ZooKeeper connection set to read only mode.");
180                                 } else if (newState == ConnectionState.RECONNECTED) {
181                                         log.info("ZooKeeper connection re-established");
182                                         handleReconnection();
183                                 } else if (newState == ConnectionState.SUSPENDED) {
184                                         log.warn("ZooKeeper connection has been suspended.");
185                                         handleConnectionSuspended();
186                                 }
187                         }
188                 };
189         }
190
191         /**
192          * Start the cache service. This must be called before any get/put
193          * operations.
194          * 
195          * @param mode
196          *            DMAAP or cambria
197          * @param curator
198          * @throws IOException
199          * @throws KafkaConsumerCacheException
200          */
201         public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
202
203                 if (fApiId == null) {
204                         throw new IllegalArgumentException("API Node ID must be specified.");
205                 }
206
207                 try {
208
209                         if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
210                                 curator = getCuratorFramework(curator);
211                         }
212                         curator.getConnectionStateListenable().addListener(listener);
213                         setStatus(Status.CONNECTED);
214                         curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
215                         curatorConsumerCache.start();
216                         curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
217                                 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
218                                         switch (event.getType()) {
219                                         case CHILD_ADDED: {
220                                                 try {
221                                                         final String apiId = new String(event.getData().getData());
222                                                         final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
223
224                                                         log.info(apiId + " started consumer " + consumer);
225                                                 } catch (Exception ex) {
226                                                         log.info("#Error Occured during Adding child" + ex);
227                                                 }
228                                                 break;
229                                         }
230                                         case CHILD_UPDATED: {
231                                                 final String apiId = new String(event.getData().getData());
232                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
233
234                                                 if (fConsumers.containsKey(consumer)) {
235                                                         log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
236                                                                         + " but wont hand over");
237                                                         // Commented so that it dont give the connection
238                                                         // until the active node is running for this client
239                                                         // id.
240                                                         dropClaimedConsumer(consumer);
241                                                 }
242
243                                                 break;
244                                         }
245                                         case CHILD_REMOVED: {
246                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
247
248                                                 if (fConsumers.containsKey(consumer)) {
249                                                         log.info("Someone wanted consumer " + consumer
250                                                                         + " gone;  but not removing it from the cache");
251                                                         dropConsumer(consumer, false);
252                                                 }
253
254                                                 break;
255                                         }
256
257                                         default:
258                                                 break;
259                                         }
260                                 }
261                         });
262
263                         // initialize the ZK path
264                         EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
265                         ensurePath.ensure(curator.getZookeeperClient());
266
267                         
268                         
269                         long freq = kDefault_SweepEverySeconds;
270                         String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
271                                         kSetting_SweepEverySeconds);
272                         if (null != strkSetting_SweepEverySeconds) {
273                                 freq = Long.parseLong(strkSetting_SweepEverySeconds);
274                         }
275
276                         fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
277                         log.info("KafkaConsumerCache started");
278                         log.info("sweeping cached clients every " + freq + " seconds");
279                 } catch (ZkException e) {
280                         log.error("@@@@@@ ZK Exception occured for  " + e);
281                         throw new KafkaConsumerCacheException(e);
282                 } catch (Exception e) {
283                         log.error("@@@@@@  Exception occured for  " + e);
284                         throw new KafkaConsumerCacheException(e);
285                 }
286         }
287
288         /**
289          * Getting the curator oject to start the zookeeper connection estabished
290          * 
291          * @param curator
292          * @return curator object
293          */
294         public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
295                 if (curator.getState() == CuratorFrameworkState.LATENT) {
296                         curator.start();
297
298                         try {
299                                 curator.blockUntilConnected();
300                         } catch (InterruptedException e) {
301                                 log.error("error while setting curator framework :",e);
302                                 Thread.currentThread().interrupt();
303                         }
304                 }
305
306                 return curator;
307         }
308
309         /**
310          * Stop the cache service.
311          */
312         public void stopCache() {
313                 setStatus(Status.DISCONNECTED);
314
315                 final CuratorFramework curator = ConfigurationReader.getCurator();
316
317                 if (curator != null) {
318                         try {
319                                 curator.getConnectionStateListenable().removeListener(listener);
320                                 curatorConsumerCache.close();
321                                 log.info("Curator client closed");
322                         } catch (ZkInterruptedException e) {
323                                 log.warn("Curator client close interrupted: ", e);
324                         } catch (IOException e) {
325                                 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
326                         }
327
328                         curatorConsumerCache = null;
329                 }
330
331                 if (fSweepScheduler != null) {
332                         fSweepScheduler.shutdownNow();
333                         log.info("cache sweeper stopped");
334                 }
335
336                 if (fConsumers != null) {
337                         fConsumers.clear();
338                         fConsumers = null;
339                 }
340
341                 setStatus(Status.NOT_STARTED);
342
343                 log.info("Consumer cache service stopped");
344         }
345
346         /**
347          * Get a cached consumer by topic, group, and id, if it exists (and remains
348          * valid) In addition, this method waits for all other consumer caches in
349          * the cluster to release their ownership and delete their version of this
350          * consumer.
351          * 
352          * @param topic
353          * @param consumerGroupId
354          * @param clientId
355          * @return a consumer, or null
356          */
357         public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
358                         throws KafkaConsumerCacheException {
359                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
360                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
361
362                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
363                 final Kafka011Consumer kc = fConsumers.get(consumerKey);
364
365                 if (kc != null) {
366                         log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
367                         kc.touch();
368                         fMetrics.onKafkaConsumerCacheHit();
369                 } else {
370                         log.debug("Consumer cache miss for [" + consumerKey + "]");
371                         fMetrics.onKafkaConsumerCacheMiss();
372                 }
373
374                 return kc;
375         }
376
377         /**
378          * Get a cached consumer by topic, group, and id, if it exists (and remains
379          * valid) In addition, this method waits for all other consumer caches in
380          * the cluster to release their ownership and delete their version of this
381          * consumer.
382          * 
383          * @param topic
384          * @param consumerGroupId
385          * @param clientId
386          * @return a consumer, or null
387          */
388         public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
389                         throws KafkaConsumerCacheException {
390                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
391                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
392                 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
393                 
394                 
395                 Enumeration<String> strEnum = fConsumers.keys();
396                 String consumerLocalKey = null;
397                 while (strEnum.hasMoreElements()) {
398                         consumerLocalKey = strEnum.nextElement();
399
400                         if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
401
402                                 
403                                 
404                                 
405                                 kcl.add(fConsumers.get(consumerLocalKey));
406
407                         }
408                 }
409
410                 return kcl;
411         }
412
413         public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
414                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
415                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
416                 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
417                 
418                 Enumeration<String> strEnum = fConsumers.keys();
419                 String consumerLocalKey = null;
420                 while (strEnum.hasMoreElements()) {
421                         consumerLocalKey = strEnum.nextElement();
422
423                         if (consumerLocalKey.startsWith(group)) {
424
425                                 
426                                 kcl.add(fConsumers.get(consumerLocalKey));
427
428                         }
429                 }
430
431                 return kcl;
432         }
433
434         /**
435          * Put a consumer into the cache by topic, group and ID
436          * 
437          * @param topic
438          * @param consumerGroupId
439          * @param consumerId
440          * @param consumer
441          * @throws KafkaConsumerCacheException
442          */
443         public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
444                         throws KafkaConsumerCacheException {
445                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
446                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
447
448                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
449                 fConsumers.put(consumerKey, consumer);
450
451                 
452
453                 log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
454         }
455
456         public Collection<? extends Consumer> getConsumers() {
457                 return new LinkedList<>(fConsumers.values());
458         }
459
460         /**
461          * This method is to drop all the consumer
462          */
463         public void dropAllConsumers() {
464                 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
465                         dropConsumer(entry.getKey(), true);
466                 }
467
468                 // consumers should be empty here
469                 if (fConsumers.size() > 0) {
470                         log.warn("During dropAllConsumers, the consumer map is not empty.");
471                         fConsumers.clear();
472                 }
473         }
474
475         /**
476          * Drop a consumer from our cache due to a timeout
477          * 
478          * @param key
479          */
480         private void dropTimedOutConsumer(String key) {
481                 fMetrics.onKafkaConsumerTimeout();
482
483                 if (!fConsumers.containsKey(key)) {
484                         log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
485                         return;
486                 }
487
488                 // First, drop this consumer from our cache
489                 boolean isdrop = dropConsumer(key, true);
490                 if (!isdrop) {
491                         return;
492                 }
493                 final CuratorFramework curator = ConfigurationReader.getCurator();
494
495                 try {
496                         curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
497                         log.info(" ^ deleted " + fBaseZkPath + "/" + key);
498                 } catch (NoNodeException e) {
499                         log.warn("A consumer was deleted from " + fApiId
500                                         + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
501                 } catch (Exception e) {
502                         log.debug("Unexpected exception while deleting consumer: ", e);
503                         log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
504                 }
505
506                 try {
507                         int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
508                         String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
509                                         kSetting_ConsumerHandoverWaitMs);
510                         if (strkSetting_ConsumerHandoverWaitMs != null)
511                                 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
512                         Thread.sleep(consumerHandoverWaitMs);
513                 } catch (InterruptedException e) {
514                         log.error("InterruptedException in dropTimedOutConsumer",e);
515                         Thread.currentThread().interrupt();
516                 }
517                 log.info("Dropped " + key + " consumer due to timeout");
518         }
519
520         /**
521          * Drop a consumer from our cache due to another API node claiming it as
522          * their own.
523          * 
524          * @param key
525          */
526         private void dropClaimedConsumer(String key) {
527                 // if the consumer is still in our cache, it implies a claim.
528                 if (fConsumers.containsKey(key)) {
529                         fMetrics.onKafkaConsumerClaimed();
530                         log.info("Consumer [" + key + "] claimed by another node.");
531                 }
532                 log.info("^dropping claimed Kafka consumer " + key);
533                 dropConsumer(key, false);
534         }
535
536         /**
537          * Removes the consumer from the cache and closes its connection to the
538          * kafka broker(s).
539          * 
540          * @param key
541          * @param dueToTimeout
542          */
543         private boolean dropConsumer(String key, boolean dueToTimeout) {
544                 final Kafka011Consumer kc = fConsumers.get(key);
545                 log.info("closing Kafka consumer " + key + " object " + kc);
546                 if (kc != null) {
547                         
548                         if (kc.close()) {
549                                 fConsumers.remove(key);
550
551                         } else {
552                                 return false;
553                         }
554                 }
555                 return true;
556         }
557
558         // private final rrNvReadable fSettings;
559         private MetricsSet fMetrics;
560         private final String fBaseZkPath;
561         private final ScheduledExecutorService fSweepScheduler;
562         private String fApiId;
563
564         public void setfMetrics(final MetricsSet metrics) {
565                 this.fMetrics = metrics;
566         }
567
568         public void setfApiId(final String id) {
569                 this.fApiId = id;
570         }
571
572         private final ConnectionStateListener listener;
573
574         private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
575         private PathChildrenCache curatorConsumerCache;
576
577         private volatile Status status;
578
579         private void handleReconnection() {
580
581                 log.info("Reading current cache data from ZK and synchronizing local cache");
582                 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
583                 // Remove all the consumers in this API nodes cache that now belong to
584                 // other API nodes.
585                 for (ChildData cachedConsumer : cacheData) {
586                         final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
587                         final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
588                                         : "undefined";
589                         if (!fApiId.equals(owningApiId)) {
590                                 fConsumers.remove(consumerId); // Commented to avoid removing
591                                 // the value cache hashmap but the lock still exists.
592                                 // This is not considered in kafka consumer Factory
593                                 log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
594                                                 + " removing " + consumerId);
595                         }
596                 }
597
598                 setStatus(Status.CONNECTED);
599         }
600
601         private void handleConnectionSuspended() {
602                 log.info("Suspending cache until ZK connection is re-established");
603
604                 setStatus(Status.SUSPENDED);
605         }
606
607         private void handleConnectionLoss() {
608                 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
609
610                 setStatus(Status.DISCONNECTED);
611
612                 closeAllCachedConsumers();
613                 fConsumers.clear();
614         }
615
616         private void closeAllCachedConsumers() {
617                 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
618                         try {
619                                 entry.getValue().close();
620                         } catch (Exception e) {
621                                 log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
622                         }
623                 }
624         }
625
626         private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
627                 return topic + "::" + consumerGroupId + "::" + clientId;
628         }
629
630         /**
631          * This method is to get a lock
632          * 
633          * @param topic
634          * @param consumerGroupId
635          * @param consumerId
636          * @throws KafkaConsumerCacheException
637          */
638         public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
639                         throws KafkaConsumerCacheException {
640                 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
641                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
642
643                 try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
644                         final String consumerPath = fBaseZkPath + "/" + consumerKey;
645                         log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
646                         final CuratorFramework curator = ConfigurationReader.getCurator();
647
648                         try {
649                                 curator.setData().forPath(consumerPath, fApiId.getBytes());
650                         } catch (KeeperException.NoNodeException e) {
651                             log.info("KeeperException.NoNodeException occured", e);
652                                 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
653                         }
654                         log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
655                         timer.end();
656                 } catch (Exception e) {
657                         log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
658                         throw new KafkaConsumerCacheException(e);
659                 }
660
661                 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
662
663                 try {
664                         int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
665                         String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
666                                         kSetting_ConsumerHandoverWaitMs);
667                         if (strkSetting_ConsumerHandoverWaitMs != null)
668                                 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
669                         Thread.sleep(consumerHandoverWaitMs);
670                 } catch (InterruptedException e) {
671                         log.error("InterruptedException in signalOwnership",e);
672                         Thread.currentThread().interrupt();
673                 }
674         }
675
676         public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
677                 return null;
678         }
679
680         public void sweep() {
681                 final LinkedList<String> removals = new LinkedList<String>();
682                 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
683                 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
684                                 kSetting_TouchEveryMs);
685                 if (null != strkSetting_TouchEveryMs) {
686                         mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
687                 }
688
689                 
690                 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
691
692                 for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
693                         final long lastTouchMs = e.getValue().getLastTouch();
694                         log.debug("consumer #####1" + e.getKey() + "    " + lastTouchMs + " < " + oldestAllowedTouchMs);
695
696                         if (lastTouchMs < oldestAllowedTouchMs) {
697                                 log.info("consumer " + e.getKey() + " has expired");
698                                 removals.add(e.getKey());
699                         }
700                 }
701
702                 for (String key : removals) {
703                         dropTimedOutConsumer(key);
704                 }
705         }
706
707         /**
708          * Creating a thread to run the sweep method
709          * 
710          * @author nilanjana.maity
711          *
712          */
713         private class sweeper implements Runnable {
714                 /**
715                  * run method
716                  */
717                 public void run() {
718                         sweep();
719                 }
720         }
721
722         /**
723          * This method is to drop consumer
724          * 
725          * @param topic
726          * @param consumerGroup
727          * @param clientId
728          */
729         public void dropConsumer(String topic, String consumerGroup, String clientId) {
730                 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
731         }
732
733         private Status getStatus() {
734                 return this.status;
735         }
736
737         private void setStatus(Status status) {
738                 this.status = status;
739         }
740
741         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
742         
743 }