Add Initial Code Import
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / messagerouter / msgrtr / nsa / cambria / backends / kafka / KafkaConsumerCache.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.kafka;
23
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33
34 import org.I0Itec.zkclient.exception.ZkException;
35 import org.I0Itec.zkclient.exception.ZkInterruptedException;
36 import org.apache.curator.framework.CuratorFramework;
37 import org.apache.curator.framework.imps.CuratorFrameworkState;
38 import org.apache.curator.framework.recipes.cache.ChildData;
39 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
40 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
41 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
42 import org.apache.curator.framework.state.ConnectionState;
43 import org.apache.curator.framework.state.ConnectionStateListener;
44 import org.apache.curator.utils.EnsurePath;
45 import org.apache.curator.utils.ZKPaths;
46 import org.apache.http.annotation.NotThreadSafe;
47 import org.apache.zookeeper.KeeperException;
48 import org.apache.zookeeper.KeeperException.NoNodeException;
49 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer;
50 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.MetricsSet;
51 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.constants.CambriaConstants;
52 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.ConfigurationReader;
53
54 //import org.slf4j.Logger;
55 //import org.slf4j.LoggerFactory;
56 import com.att.eelf.configuration.EELFLogger;
57 import com.att.eelf.configuration.EELFManager;
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import com.att.nsa.drumlin.till.nv.rrNvReadable;
60
61 /**
62  * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
63  *                must be
64  * @author author
65  *
66  */
67 @NotThreadSafe
68 public class KafkaConsumerCache {
69
70         private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
71         private static final int kDefault_ConsumerHandoverWaitMs = 500;
72
73         private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
74         private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
75
76         private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
77         private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
78
79         // kafka defaults to timing out a client after 6 seconds of inactivity, but
80         // it heartbeats even when the client isn't fetching. Here, we don't
81         // want to prematurely rebalance the consumer group. Assuming clients are
82         // hitting
83         // the server at least every 30 seconds, timing out after 2 minutes should
84         // be okay.
85         // FIXME: consider allowing the client to specify its expected call rate?
86         private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
87
88         // check for expirations pretty regularly
89         private static final long kDefault_SweepEverySeconds = 15;
90
91         private enum Status {
92                 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
93         }
94
95         /**
96          * User defined exception class for kafka consumer cache
97          * 
98          * @author author
99          *
100          */
101         public class KafkaConsumerCacheException extends Exception {
102                 /**
103                  * To throw the exception
104                  * 
105                  * @param t
106                  */
107                 KafkaConsumerCacheException(Throwable t) {
108                         super(t);
109                 }
110
111                 /**
112                  * 
113                  * @param s
114                  */
115                 public KafkaConsumerCacheException(String s) {
116                         super(s);
117                 }
118
119                 private static final long serialVersionUID = 1L;
120         }
121
122         /**
123          * Creates a KafkaConsumerCache object. Before it is used, you must call
124          * startCache()
125          * 
126          * @param apiId
127          * @param s
128          * @param metrics
129          */
130         public KafkaConsumerCache(String apiId,  MetricsSet metrics) {
131
132                 if (apiId == null) {
133                         throw new IllegalArgumentException("API Node ID must be specified.");
134                 }
135
136                 fApiId = apiId;
137         //      fSettings = s;
138                 fMetrics = metrics;
139                 String strkSetting_ZkBasePath= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ZkBasePath);
140                 if(null==strkSetting_ZkBasePath)strkSetting_ZkBasePath = kDefault_ZkBasePath;
141                 fBaseZkPath = strkSetting_ZkBasePath;
142
143                 fConsumers = new ConcurrentHashMap<String, KafkaConsumer>();
144                 fSweepScheduler = Executors.newScheduledThreadPool(1);
145
146                 curatorConsumerCache = null;
147
148                 status = Status.NOT_STARTED;
149
150                 listener = new ConnectionStateListener() {
151                         public void stateChanged(CuratorFramework client, ConnectionState newState) {
152                                 if (newState == ConnectionState.LOST) {
153                                         log.info("ZooKeeper connection expired");
154                                         handleConnectionLoss();
155                                 } else if (newState == ConnectionState.READ_ONLY) {
156                                         log.warn("ZooKeeper connection set to read only mode.");
157                                 } else if (newState == ConnectionState.RECONNECTED) {
158                                         log.info("ZooKeeper connection re-established");
159                                         handleReconnection();
160                                 } else if (newState == ConnectionState.SUSPENDED) {
161                                         log.warn("ZooKeeper connection has been suspended.");
162                                         handleConnectionSuspended();
163                                 }
164                         }
165                 };
166         }
167
168         /**
169          * Start the cache service. This must be called before any get/put
170          * operations.
171          * 
172          * @param mode
173          *            DMAAP or cambria
174          * @param curator
175          * @throws IOException
176          * @throws KafkaConsumerCacheException
177          */
178         public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
179                 try {
180
181                         // CuratorFramework curator = null;
182
183                         // Changed the class from where we are initializing the curator
184                         // framework
185                         if (mode != null && mode.equals(CambriaConstants.CAMBRIA)) {
186                                 curator = ConfigurationReader.getCurator();
187                         } else if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
188                                 curator = getCuratorFramework(curator);
189                         }
190
191                         curator.getConnectionStateListenable().addListener(listener);
192
193                         setStatus(Status.CONNECTED);
194
195                         curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
196                         curatorConsumerCache.start();
197
198                         curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
199                                 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
200                                         switch (event.getType()) {
201                                         case CHILD_ADDED: {
202                                                 final String apiId = new String(event.getData().getData());
203                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
204
205                                                 log.info(apiId + " started consumer " + consumer);
206                                                 break;
207                                         }
208                                         case CHILD_UPDATED: {
209                                                 final String apiId = new String(event.getData().getData());
210                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
211
212                                                 if (fConsumers.containsKey(consumer)) {
213                                                         log.info(apiId + " claimed consumer " + consumer + " from " + fApiId);
214
215                                                         dropClaimedConsumer(consumer);
216                                                 }
217
218                                                 break;
219                                         }
220                                         case CHILD_REMOVED: {
221                                                 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
222
223                                                 if (fConsumers.containsKey(consumer)) {
224                                                         log.info("Someone wanted consumer " + consumer + " gone;  removing it from the cache");
225                                                         dropConsumer(consumer, false);
226                                                 }
227
228                                                 break;
229                                         }
230                                         default:
231                                                 break;
232                                         }
233                                 }
234                         });
235
236                         // initialize the ZK path
237                         EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
238                         ensurePath.ensure(curator.getZookeeperClient());
239
240                         //final long freq = fSettings.getLong(kSetting_SweepEverySeconds, kDefault_SweepEverySeconds);
241                         long freq = kDefault_SweepEverySeconds;
242                         String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_SweepEverySeconds);
243                         if(null==strkSetting_SweepEverySeconds) strkSetting_SweepEverySeconds = kDefault_SweepEverySeconds+"";
244                         
245                           freq = Long.parseLong(strkSetting_SweepEverySeconds);
246                                         
247                         fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
248                         log.info("KafkaConsumerCache started");
249                         log.info("sweeping cached clients every " + freq + " seconds");
250                 } catch (ZkException e) {
251                         throw new KafkaConsumerCacheException(e);
252                 } catch (Exception e) {
253                         throw new KafkaConsumerCacheException(e);
254                 }
255         }
256
257         /**
258          * Getting the curator oject to start the zookeeper connection estabished
259          * 
260          * @param curator
261          * @return curator object
262          */
263         public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
264                 if (curator.getState() == CuratorFrameworkState.LATENT) {
265                         curator.start();
266
267                         try {
268                                 curator.blockUntilConnected();
269                         } catch (InterruptedException e) {
270                                 // Ignore
271                                 log.error("error while setting curator framework :" + e.getMessage());
272                         }
273                 }
274
275                 return curator;
276         }
277
278         /**
279          * Stop the cache service.
280          */
281         public void stopCache() {
282                 setStatus(Status.DISCONNECTED);
283
284                 final CuratorFramework curator = ConfigurationReader.getCurator();
285
286                 if (curator != null) {
287                         try {
288                                 curator.getConnectionStateListenable().removeListener(listener);
289                                 curatorConsumerCache.close();
290                                 log.info("Curator client closed");
291                         } catch (ZkInterruptedException e) {
292                                 log.warn("Curator client close interrupted: " + e.getMessage());
293                         } catch (IOException e) {
294                                 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
295                         }
296
297                         curatorConsumerCache = null;
298                 }
299
300                 if (fSweepScheduler != null) {
301                         fSweepScheduler.shutdownNow();
302                         log.info("cache sweeper stopped");
303                 }
304
305                 if (fConsumers != null) {
306                         fConsumers.clear();
307                         fConsumers = null;
308                 }
309
310                 setStatus(Status.NOT_STARTED);
311
312                 log.info("Consumer cache service stopped");
313         }
314
315         /**
316          * Get a cached consumer by topic, group, and id, if it exists (and remains
317          * valid) In addition, this method waits for all other consumer caches in
318          * the cluster to release their ownership and delete their version of this
319          * consumer.
320          * 
321          * @param topic
322          * @param consumerGroupId
323          * @param clientId
324          * @return a consumer, or null
325          */
326         public KafkaConsumer getConsumerFor(String topic, String consumerGroupId, String clientId)
327                         throws KafkaConsumerCacheException {
328                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
329                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
330
331                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
332                 final KafkaConsumer kc = fConsumers.get(consumerKey);
333
334                 if (kc != null) {
335                         log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
336                         kc.touch();
337                         fMetrics.onKafkaConsumerCacheHit();
338                 } else {
339                         log.debug("Consumer cache miss for [" + consumerKey + "]");
340                         fMetrics.onKafkaConsumerCacheMiss();
341                 }
342
343                 return kc;
344         }
345
346         /**
347          * Put a consumer into the cache by topic, group and ID
348          * 
349          * @param topic
350          * @param consumerGroupId
351          * @param consumerId
352          * @param consumer
353          * @throws KafkaConsumerCacheException
354          */
355         public void putConsumerFor(String topic, String consumerGroupId, String consumerId, KafkaConsumer consumer)
356                         throws KafkaConsumerCacheException {
357                 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
358                         throw new KafkaConsumerCacheException("The cache service is unavailable.");
359
360                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
361                 fConsumers.put(consumerKey, consumer);
362         }
363
364         public Collection<? extends Consumer> getConsumers() {
365                 return new LinkedList<KafkaConsumer>(fConsumers.values());
366         }
367
368         /**
369          * This method is to drop all the consumer
370          */
371         public void dropAllConsumers() {
372                 for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
373                         dropConsumer(entry.getKey(), true);
374                 }
375
376                 // consumers should be empty here
377                 if (fConsumers.size() > 0) {
378                         log.warn("During dropAllConsumers, the consumer map is not empty.");
379                         fConsumers.clear();
380                 }
381         }
382
383         /**
384          * Drop a consumer from our cache due to a timeout
385          * 
386          * @param key
387          */
388         private void dropTimedOutConsumer(String key) {
389                 fMetrics.onKafkaConsumerTimeout();
390
391                 if (!fConsumers.containsKey(key)) {
392                         log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
393                         return;
394                 }
395
396                 // First, drop this consumer from our cache
397                 dropConsumer(key, true);
398
399                 final CuratorFramework curator = ConfigurationReader.getCurator();
400
401                 try {
402                         curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
403                 } catch (NoNodeException e) {
404                         log.warn("A consumer was deleted from " + fApiId
405                                         + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
406                 } catch (Exception e) {
407                         log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
408                 }
409
410                 log.info("Dropped " + key + " consumer due to timeout");
411         }
412
413         /**
414          * Drop a consumer from our cache due to another API node claiming it as
415          * their own.
416          * 
417          * @param key
418          */
419         private void dropClaimedConsumer(String key) {
420                 // if the consumer is still in our cache, it implies a claim.
421                 if (fConsumers.containsKey(key)) {
422                         fMetrics.onKafkaConsumerClaimed();
423                         log.info("Consumer [" + key + "] claimed by another node.");
424                 }
425
426                 dropConsumer(key, false);
427         }
428
429         /**
430          * Removes the consumer from the cache and closes its connection to the
431          * kafka broker(s).
432          * 
433          * @param key
434          * @param dueToTimeout
435          */
436         private void dropConsumer(String key, boolean dueToTimeout) {
437                 final KafkaConsumer kc = fConsumers.remove(key);
438
439                 if (kc != null) {
440                         log.info("closing Kafka consumer " + key);
441                         kc.close();
442                 }
443         }
444
445 //      private final rrNvReadable fSettings;
446         private final MetricsSet fMetrics;
447         private final String fBaseZkPath;
448         private final ScheduledExecutorService fSweepScheduler;
449         private final String fApiId;
450         private final ConnectionStateListener listener;
451
452         private ConcurrentHashMap<String, KafkaConsumer> fConsumers;
453         private PathChildrenCache curatorConsumerCache;
454
455         private volatile Status status;
456
457         private void handleReconnection() {
458
459                 log.info("Reading current cache data from ZK and synchronizing local cache");
460
461                 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
462
463                 // Remove all the consumers in this API nodes cache that now belong to
464                 // other API nodes.
465                 for (ChildData cachedConsumer : cacheData) {
466                         final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
467                         final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
468                                         : "undefined";
469
470                         if (!fApiId.equals(owningApiId)) {
471                                 fConsumers.remove(consumerId);
472                         }
473                 }
474
475                 setStatus(Status.CONNECTED);
476         }
477
478         private void handleConnectionSuspended() {
479                 log.info("Suspending cache until ZK connection is re-established");
480
481                 setStatus(Status.SUSPENDED);
482         }
483
484         private void handleConnectionLoss() {
485                 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
486
487                 setStatus(Status.DISCONNECTED);
488
489                 closeAllCachedConsumers();
490                 fConsumers.clear();
491         }
492
493         private void closeAllCachedConsumers() {
494                 for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
495                         entry.getValue().close();
496                 }
497         }
498
499         private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
500                 return topic + "::" + consumerGroupId + "::" + clientId;
501         }
502
503         /**
504          * This method is to get a lock
505          * 
506          * @param topic
507          * @param consumerGroupId
508          * @param consumerId
509          * @throws KafkaConsumerCacheException
510          */
511         public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
512                         throws KafkaConsumerCacheException {
513                 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
514                 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
515
516                 try {
517                         final String consumerPath = fBaseZkPath + "/" + consumerKey;
518
519                         log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
520
521                         final CuratorFramework curator = ConfigurationReader.getCurator();
522
523                         try {
524                                 curator.setData().forPath(consumerPath, fApiId.getBytes());
525                         } catch (KeeperException.NoNodeException e) {
526                                 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
527                         }
528
529                         log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
530                 } catch (Exception e) {
531                         log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
532                         throw new KafkaConsumerCacheException(e);
533                 }
534
535                 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
536
537                 try {
538                         int kSetting_ConsumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
539                         String strkSetting_ConsumerHandoverWaitMs= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ConsumerHandoverWaitMs+"");
540                         if(strkSetting_ConsumerHandoverWaitMs!=null) kSetting_ConsumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
541                         
542                                         Thread.sleep(kSetting_ConsumerHandoverWaitMs);
543                         //Thread.sleep(fSettings.getInt(kSetting_ConsumerHandoverWaitMs, kDefault_ConsumerHandoverWaitMs));
544                 } catch (InterruptedException e) {
545                         // Ignore
546                 }
547         }
548
549         private void sweep() {
550                 final LinkedList<String> removals = new LinkedList<String>();
551                 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
552                 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_TouchEveryMs);
553                 //if(null!=strkSetting_TouchEveryMs) strkSetting_TouchEveryMs = kDefault_MustTouchEveryMs+"";
554                 if(null!=strkSetting_TouchEveryMs)
555                 {
556                   mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);  
557                 }
558
559                 //final long mustTouchEveryMs = fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
560                 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
561
562                 for (Entry<String, KafkaConsumer> e : fConsumers.entrySet()) {
563                         final long lastTouchMs = e.getValue().getLastTouch();
564
565                         log.debug("consumer " + e.getKey() + " last touched at " + lastTouchMs);
566
567                         if (lastTouchMs < oldestAllowedTouchMs) {
568                                 log.info("consumer " + e.getKey() + " has expired");
569                                 removals.add(e.getKey());
570                         }
571                 }
572
573                 for (String key : removals) {
574                         dropTimedOutConsumer(key);
575                 }
576         }
577
578         /**
579          * Creating a thread to run the sweep method
580          * 
581          * @author author
582          *
583          */
584         private class sweeper implements Runnable {
585                 /**
586                  * run method
587                  */
588                 public void run() {
589                         sweep();
590                 }
591         }
592
593         /**
594          * This method is to drop consumer
595          * 
596          * @param topic
597          * @param consumerGroup
598          * @param clientId
599          */
600         public void dropConsumer(String topic, String consumerGroup, String clientId) {
601                 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
602         }
603
604         private Status getStatus() {
605                 return this.status;
606         }
607
608         private void setStatus(Status status) {
609                 this.status = status;
610         }
611
612         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
613         //private static final Logger log = LoggerFactory.getLogger(KafkaConsumerCache.class);
614 }