9bd67d17ed04fd8806ec0891366bff9c8153dc25
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / backends / kafka / KafkaConsumerCache.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.backends.kafka;
23
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33
34 import org.I0Itec.zkclient.exception.ZkException;
35 import org.I0Itec.zkclient.exception.ZkInterruptedException;
36 import org.apache.curator.framework.CuratorFramework;
37 import org.apache.curator.framework.imps.CuratorFrameworkState;
38 import org.apache.curator.framework.recipes.cache.ChildData;
39 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
40 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
41 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
42 import org.apache.curator.framework.state.ConnectionState;
43 import org.apache.curator.framework.state.ConnectionStateListener;
44 import org.apache.curator.utils.EnsurePath;
45 import org.apache.curator.utils.ZKPaths;
46 import org.apache.http.annotation.NotThreadSafe;
47 import org.apache.zookeeper.KeeperException;
48 import org.apache.zookeeper.KeeperException.NoNodeException;
49
50 //import org.slf4j.LoggerFactory;
51 import com.att.eelf.configuration.EELFLogger;
52 import com.att.eelf.configuration.EELFManager;
53 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
54 import com.att.nsa.cambria.backends.Consumer;
55 import com.att.nsa.cambria.backends.MetricsSet;
56 import com.att.nsa.cambria.constants.CambriaConstants;
57 import com.att.nsa.cambria.utils.ConfigurationReader;
58 import com.att.nsa.drumlin.till.nv.rrNvReadable;
59
60 /**
61  * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
62  *                must be
63  * @author author
64  *
65  */
66 @NotThreadSafe
67 public class KafkaConsumerCache {
68
69         private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
70         private static final int kDefault_ConsumerHandoverWaitMs = 500;
71
72         private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
73         private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
74
75         private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
76         private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
77
78         // kafka defaults to timing out a client after 6 seconds of inactivity, but
79         // it heartbeats even when the client isn't fetching. Here, we don't
80         // want to prematurely rebalance the consumer group. Assuming clients are
81         // hitting
82         // the server at least every 30 seconds, timing out after 2 minutes should
83         // be okay.
84         // FIXME: consider allowing the client to specify its expected call rate?
85         private static final long kDefault_MustTouchEveryMs = (long)1000 * 60 * 2;
86
87         // check for expirations pretty regularly
88         private static final long kDefault_SweepEverySeconds = 15;
89
90         private enum Status {
91                 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
92         }
93
94         /**
95          * User defined exception class for kafka consumer cache
96          * 
97          * @author author
98          *
99          */
100         public class KafkaConsumerCacheException extends Exception {
101                 /**
102                  * To throw the exception
103                  * 
104                  * @param t
105                  */
106                 KafkaConsumerCacheException(Throwable t) {
107                         super(t);
108                 }
109
110                 /**
111                  * 
112                  * @param s
113                  */
114                 public KafkaConsumerCacheException(String s) {
115                         super(s);
116                 }
117
118                 private static final long serialVersionUID = 1L;
119         }
120
121         /**
122          * Creates a KafkaConsumerCache object. Before it is used, you must call
123          * startCache()
124          * 
125          * @param apiId
126          * @param s
127          * @param metrics
128          */
129         public KafkaConsumerCache(String apiId,  MetricsSet metrics) {
130
131                 if (apiId == null) {
132                         throw new IllegalArgumentException("API Node ID must be specified.");
133                 }
134
135                 fApiId = apiId;
136         //      fSettings = s;
137                 fMetrics = metrics;
138                 String strkSetting_ZkBasePath= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ZkBasePath);
139                 if(null==strkSetting_ZkBasePath)strkSetting_ZkBasePath = kDefault_ZkBasePath;
140                 fBaseZkPath = strkSetting_ZkBasePath;
141
142                 fConsumers = new ConcurrentHashMap<String, KafkaConsumer>();
143                 fSweepScheduler = Executors.newScheduledThreadPool(1);
144
145                 curatorConsumerCache = null;
146
147                 status = Status.NOT_STARTED;
148
149                 listener = new ConnectionStateListener() {
150                         public void stateChanged(CuratorFramework client, ConnectionState newState) {
151                                 if (newState == ConnectionState.LOST) {
152                                         log.info("ZooKeeper connection expired");
153                                         handleConnectionLoss();
154                                 } else if (newState == ConnectionState.READ_ONLY) {
155                                         log.warn("ZooKeeper connection set to read only mode.");
156                                 } else if (newState == ConnectionState.RECONNECTED) {
157                                         log.info("ZooKeeper connection re-established");
158                                         handleReconnection();
159                                 } else if (newState == ConnectionState.SUSPENDED) {
160                                         log.warn("ZooKeeper connection has been suspended.");
161                                         handleConnectionSuspended();
162                                 }
163                         }
164                 };
165         }
166
167         /**
168          * Start the cache service. This must be called before any get/put
169          * operations.
170          * 
171          * @param mode
172          *            DMAAP or cambria
173          * @param curator
174          * @throws IOException
175          * @throws KafkaConsumerCacheException
176          */
177         public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
178                 try {
179
180                         // CuratorFramework curator = null;
181
182                         // Changed the class from where we are initializing the curator
183                         // framework
184                         if (mode != null && mode.equals(CambriaConstants.CAMBRIA)) {
185                                 curator = ConfigurationReader.getCurator();
186                         } else if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
187                                 curator = getCuratorFramework(curator);
188                         }
189
190                         curator.getConnectionStateListenable().addListener(listener);
191
192                         setStatus(Status.CONNECTED);
193
194                         curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
195                         curatorConsumerCache.start();
196
197                         curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
198                                 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
199                                         switch (event.getType()) {
200                                         case CHILD_ADDED: {
201                                                 final String apiId = new String(event.getData().getData());
202                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
203
204                                                 log.info(apiId + " started consumer " + consumer);
205                                                 break;
206                                         }
207                                         case CHILD_UPDATED: {
208                                                 final String apiId = new String(event.getData().getData());
209                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
210
211                                                 if (fConsumers.containsKey(consumer)) {
212                                                         log.info(apiId + " claimed consumer " + consumer + " from " + fApiId);
213
214                                                         dropClaimedConsumer(consumer);
215                                                 }
216
217                                                 break;
218                                         }
219                                         case CHILD_REMOVED: {
220                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
221
222                                                 if (fConsumers.containsKey(consumer)) {
223                                                         log.info("Someone wanted consumer " + consumer + " gone;  removing it from the cache");
224                                                         dropConsumer(consumer, false);
225                                                 }
226
227                                                 break;
228                                         }
229                                         default:
230                                                 break;
231                                         }
232                                 }
233                         });
234
235                         // initialize the ZK path
236                         EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
237                         ensurePath.ensure(curator.getZookeeperClient());
238
239                         //final long freq = fSettings.getLong(kSetting_SweepEverySeconds, kDefault_SweepEverySeconds);
240                         long freq = kDefault_SweepEverySeconds;
241                         String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_SweepEverySeconds);
242                         if(null==strkSetting_SweepEverySeconds) {
243                                 strkSetting_SweepEverySeconds = kDefault_SweepEverySeconds+"";
244                         }
245                         
246                           freq = Long.parseLong(strkSetting_SweepEverySeconds);
247                                         
248                         fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
249                         log.info("KafkaConsumerCache started");
250                         log.info("sweeping cached clients every " + freq + " seconds");
251                 } catch (ZkException e) {
252                         throw new KafkaConsumerCacheException(e);
253                 } catch (Exception e) {
254                         throw new KafkaConsumerCacheException(e);
255                 }
256         }
257
258         /**
259          * Getting the curator oject to start the zookeeper connection estabished
260          * 
261          * @param curator
262          * @return curator object
263          */
264         public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
265                 if (curator.getState() == CuratorFrameworkState.LATENT) {
266                         curator.start();
267
268                         try {
269                                 curator.blockUntilConnected();
270                         } catch (InterruptedException e) {
271                                 // Ignore
272                                 log.error("error while setting curator framework :" + e.getMessage());
273                                 Thread.currentThread().interrupt();
274                         }
275                 }
276
277                 return curator;
278         }
279
280         /**
281          * Stop the cache service.
282          */
283         public void stopCache() {
284                 setStatus(Status.DISCONNECTED);
285
286                 final CuratorFramework curator = ConfigurationReader.getCurator();
287
288                 if (curator != null) {
289                         try {
290                                 curator.getConnectionStateListenable().removeListener(listener);
291                                 curatorConsumerCache.close();
292                                 log.info("Curator client closed");
293                         } catch (ZkInterruptedException e) {
294                                 log.error("Curator client close interrupted: " + e);
295                         } catch (IOException e) {
296                                 log.error("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e);
297                         }
298
299                         curatorConsumerCache = null;
300                 }
301
302                 if (fSweepScheduler != null) {
303                         fSweepScheduler.shutdownNow();
304                         log.info("cache sweeper stopped");
305                 }
306
307                 if (fConsumers != null) {
308                         fConsumers.clear();
309                         fConsumers = null;
310                 }
311
312                 setStatus(Status.NOT_STARTED);
313
314                 log.info("Consumer cache service stopped");
315         }
316
317         /**
318          * Get a cached consumer by topic, group, and id, if it exists (and remains
319          * valid) In addition, this method waits for all other consumer caches in
320          * the cluster to release their ownership and delete their version of this
321          * consumer.
322          * 
323          * @param topic
324          * @param consumerGroupId
325          * @param clientId
326          * @return a consumer, or null
327          */
328         public KafkaConsumer getConsumerFor(String topic, String consumerGroupId, String clientId)
329                         throws KafkaConsumerCacheException {
330                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
331                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
332
333                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
334                 final KafkaConsumer kc = fConsumers.get(consumerKey);
335
336                 if (kc != null) {
337                         log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
338                         kc.touch();
339                         fMetrics.onKafkaConsumerCacheHit();
340                 } else {
341                         log.debug("Consumer cache miss for [" + consumerKey + "]");
342                         fMetrics.onKafkaConsumerCacheMiss();
343                 }
344
345                 return kc;
346         }
347
348         /**
349          * Put a consumer into the cache by topic, group and ID
350          * 
351          * @param topic
352          * @param consumerGroupId
353          * @param consumerId
354          * @param consumer
355          * @throws KafkaConsumerCacheException
356          */
357         public void putConsumerFor(String topic, String consumerGroupId, String consumerId, KafkaConsumer consumer)
358                         throws KafkaConsumerCacheException {
359                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
360                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
361
362                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
363                 fConsumers.put(consumerKey, consumer);
364         }
365
366         public Collection<? extends Consumer> getConsumers() {
367                 return new LinkedList<KafkaConsumer>(fConsumers.values());
368         }
369
370         /**
371          * This method is to drop all the consumer
372          */
373         public void dropAllConsumers() {
374                 for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
375                         dropConsumer(entry.getKey(), true);
376                 }
377
378                 // consumers should be empty here
379                 if (fConsumers.size() > 0) {
380                         log.warn("During dropAllConsumers, the consumer map is not empty.");
381                         fConsumers.clear();
382                 }
383         }
384
385         /**
386          * Drop a consumer from our cache due to a timeout
387          * 
388          * @param key
389          */
390         private void dropTimedOutConsumer(String key) {
391                 fMetrics.onKafkaConsumerTimeout();
392
393                 if (!fConsumers.containsKey(key)) {
394                         log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
395                         return;
396                 }
397
398                 // First, drop this consumer from our cache
399                 dropConsumer(key, true);
400
401                 final CuratorFramework curator = ConfigurationReader.getCurator();
402
403                 try {
404                         curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
405                 } catch (NoNodeException e) {
406                         log.error("Exception at : " + e);
407                 } catch (Exception e) {
408                         log.error("Unexpected exception while deleting consumer: " + e);
409                 }
410
411                 log.info("Dropped " + key + " consumer due to timeout");
412         }
413
414         /**
415          * Drop a consumer from our cache due to another API node claiming it as
416          * their own.
417          * 
418          * @param key
419          */
420         private void dropClaimedConsumer(String key) {
421                 // if the consumer is still in our cache, it implies a claim.
422                 if (fConsumers.containsKey(key)) {
423                         fMetrics.onKafkaConsumerClaimed();
424                         log.info("Consumer [" + key + "] claimed by another node.");
425                 }
426
427                 dropConsumer(key, false);
428         }
429
430         /**
431          * Removes the consumer from the cache and closes its connection to the
432          * kafka broker(s).
433          * 
434          * @param key
435          * @param dueToTimeout
436          */
437         private void dropConsumer(String key, boolean dueToTimeout) {
438                 final KafkaConsumer kc = fConsumers.remove(key);
439
440                 if (kc != null) {
441                         log.info("closing Kafka consumer " + key);
442                         kc.close();
443                 }
444         }
445
446 //      private final rrNvReadable fSettings;
447         private final MetricsSet fMetrics;
448         private final String fBaseZkPath;
449         private final ScheduledExecutorService fSweepScheduler;
450         private final String fApiId;
451         private final ConnectionStateListener listener;
452
453         private ConcurrentHashMap<String, KafkaConsumer> fConsumers;
454         private PathChildrenCache curatorConsumerCache;
455
456         private volatile Status status;
457
458         private void handleReconnection() {
459
460                 log.info("Reading current cache data from ZK and synchronizing local cache");
461
462                 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
463
464                 // Remove all the consumers in this API nodes cache that now belong to
465                 // other API nodes.
466                 for (ChildData cachedConsumer : cacheData) {
467                         final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
468                         final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
469                                         : "undefined";
470
471                         if (!fApiId.equals(owningApiId)) {
472                                 fConsumers.remove(consumerId);
473                         }
474                 }
475
476                 setStatus(Status.CONNECTED);
477         }
478
479         private void handleConnectionSuspended() {
480                 log.info("Suspending cache until ZK connection is re-established");
481
482                 setStatus(Status.SUSPENDED);
483         }
484
485         private void handleConnectionLoss() {
486                 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
487
488                 setStatus(Status.DISCONNECTED);
489
490                 closeAllCachedConsumers();
491                 fConsumers.clear();
492         }
493
494         private void closeAllCachedConsumers() {
495                 for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
496                         entry.getValue().close();
497                 }
498         }
499
500         private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
501                 return topic + "::" + consumerGroupId + "::" + clientId;
502         }
503
504         /**
505          * This method is to get a lock
506          * 
507          * @param topic
508          * @param consumerGroupId
509          * @param consumerId
510          * @throws KafkaConsumerCacheException
511          */
512         public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
513                         throws KafkaConsumerCacheException {
514                 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
515                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
516
517                 try {
518                         final String consumerPath = fBaseZkPath + "/" + consumerKey;
519
520                         log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
521
522                         final CuratorFramework curator = ConfigurationReader.getCurator();
523
524                         try {
525                                 curator.setData().forPath(consumerPath, fApiId.getBytes());
526                         } catch (KeeperException.NoNodeException e) {
527                                 log.error(e.toString());
528                                 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
529                         }
530
531                         log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
532                 } catch (Exception e) {
533                         log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
534                         throw new KafkaConsumerCacheException(e);
535                 }
536
537                 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
538
539                 try {
540                         int kSetting_ConsumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
541                         String strkSetting_ConsumerHandoverWaitMs= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ConsumerHandoverWaitMs+"");
542                         if(strkSetting_ConsumerHandoverWaitMs!=null) {
543                                 kSetting_ConsumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
544                         }
545                                         Thread.sleep(kSetting_ConsumerHandoverWaitMs);
546                         //Thread.sleep(fSettings.getInt(kSetting_ConsumerHandoverWaitMs, kDefault_ConsumerHandoverWaitMs));
547                 } catch (InterruptedException e) {
548                         log.error(e.toString());
549                         Thread.currentThread().interrupt();
550                 }
551         }
552
553         private void sweep() {
554                 final LinkedList<String> removals = new LinkedList<String>();
555                 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
556                 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_TouchEveryMs);
557                 //if(null!=strkSetting_TouchEveryMs) strkSetting_TouchEveryMs = kDefault_MustTouchEveryMs+"";
558                 if(null!=strkSetting_TouchEveryMs)
559                 {
560                   mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);  
561                 }
562
563                 //final long mustTouchEveryMs = fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
564                 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
565
566                 for (Entry<String, KafkaConsumer> e : fConsumers.entrySet()) {
567                         final long lastTouchMs = e.getValue().getLastTouch();
568
569                         log.debug("consumer " + e.getKey() + " last touched at " + lastTouchMs);
570
571                         if (lastTouchMs < oldestAllowedTouchMs) {
572                                 log.info("consumer " + e.getKey() + " has expired");
573                                 removals.add(e.getKey());
574                         }
575                 }
576
577                 for (String key : removals) {
578                         dropTimedOutConsumer(key);
579                 }
580         }
581
582         /**
583          * Creating a thread to run the sweep method
584          * 
585          * @author author
586          *
587          */
588         private class sweeper implements Runnable {
589                 /**
590                  * run method
591                  */
592                 public void run() {
593                         sweep();
594                 }
595         }
596
597         /**
598          * This method is to drop consumer
599          * 
600          * @param topic
601          * @param consumerGroup
602          * @param clientId
603          */
604         public void dropConsumer(String topic, String consumerGroup, String clientId) {
605                 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
606         }
607
608         private Status getStatus() {
609                 return this.status;
610         }
611
612         private void setStatus(Status status) {
613                 this.status = status;
614         }
615
616         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
617         //private static final Logger log = LoggerFactory.getLogger(KafkaConsumerCache.class);
618 }