1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.io.IOException;
25 import java.net.InetAddress;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Enumeration;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map.Entry;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.TimeUnit;
37 import javax.annotation.Resource;
39 import org.I0Itec.zkclient.exception.ZkException;
40 import org.I0Itec.zkclient.exception.ZkInterruptedException;
41 import org.apache.curator.framework.CuratorFramework;
42 import org.apache.curator.framework.imps.CuratorFrameworkState;
43 import org.apache.curator.framework.recipes.cache.ChildData;
44 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
45 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
46 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
47 import org.apache.curator.framework.state.ConnectionState;
48 import org.apache.curator.framework.state.ConnectionStateListener;
49 import org.apache.curator.utils.EnsurePath;
50 import org.apache.curator.utils.ZKPaths;
51 import org.apache.http.annotation.NotThreadSafe;
52 import org.apache.zookeeper.KeeperException;
53 import org.apache.zookeeper.KeeperException.NoNodeException;
54 import org.springframework.beans.factory.annotation.Autowired;
55 import org.springframework.beans.factory.annotation.Qualifier;
56 import org.springframework.context.annotation.ComponentScan;
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import org.onap.dmaap.dmf.mr.backends.Consumer;
60 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
61 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
63 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
66 import com.att.eelf.configuration.EELFLogger;
67 import com.att.eelf.configuration.EELFManager;
68 import com.att.nsa.metrics.CdmTimer;
71 * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
77 public class KafkaConsumerCache {
79 private static KafkaConsumerCache kafkaconscache = null;
81 public static KafkaConsumerCache getInstance() {
82 if (kafkaconscache == null)
83 kafkaconscache = new KafkaConsumerCache();
85 return kafkaconscache;
88 private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
89 private static final int kDefault_ConsumerHandoverWaitMs = 500;
91 private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
92 private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
94 private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
95 private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
97 // kafka defaults to timing out a client after 6 seconds of inactivity, but
98 // it heartbeats even when the client isn't fetching. Here, we don't
99 // want to prematurely rebalance the consumer group. Assuming clients are
101 // the server at least every 30 seconds, timing out after 2 minutes should
103 // FIXME: consider allowing the client to specify its expected call rate?
104 private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
106 // check for expirations pretty regularly
107 private static final long kDefault_SweepEverySeconds = 15;
109 private enum Status {
110 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
117 private DMaaPErrorMessages errorMessages;
121 * User defined exception class for kafka consumer cache
123 * @author nilanjana.maity
126 public class KafkaConsumerCacheException extends Exception {
128 * To throw the exception
132 KafkaConsumerCacheException(Throwable t) {
140 public KafkaConsumerCacheException(String s) {
144 private static final long serialVersionUID = 1L;
148 * Creates a KafkaConsumerCache object. Before it is used, you must call
155 public KafkaConsumerCache() {
157 String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
158 kSetting_ZkBasePath);
159 if (null == strkSetting_ZkBasePath)
160 strkSetting_ZkBasePath = kDefault_ZkBasePath;
161 fBaseZkPath = strkSetting_ZkBasePath;
163 fConsumers = new ConcurrentHashMap<>();
164 fSweepScheduler = Executors.newScheduledThreadPool(1);
166 curatorConsumerCache = null;
168 status = Status.NOT_STARTED;
169 // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
172 listener = new ConnectionStateListener() {
173 public void stateChanged(CuratorFramework client, ConnectionState newState) {
174 if (newState == ConnectionState.LOST) {
176 log.info("ZooKeeper connection expired");
177 handleConnectionLoss();
178 } else if (newState == ConnectionState.READ_ONLY) {
179 log.warn("ZooKeeper connection set to read only mode.");
180 } else if (newState == ConnectionState.RECONNECTED) {
181 log.info("ZooKeeper connection re-established");
182 handleReconnection();
183 } else if (newState == ConnectionState.SUSPENDED) {
184 log.warn("ZooKeeper connection has been suspended.");
185 handleConnectionSuspended();
192 * Start the cache service. This must be called before any get/put
198 * @throws IOException
199 * @throws KafkaConsumerCacheException
201 public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
203 if (fApiId == null) {
204 throw new IllegalArgumentException("API Node ID must be specified.");
209 if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
210 curator = getCuratorFramework(curator);
212 curator.getConnectionStateListenable().addListener(listener);
213 setStatus(Status.CONNECTED);
214 curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
215 curatorConsumerCache.start();
216 curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
217 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
218 switch (event.getType()) {
221 final String apiId = new String(event.getData().getData());
222 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
224 log.info(apiId + " started consumer " + consumer);
225 } catch (Exception ex) {
226 log.info("#Error Occured during Adding child" + ex);
230 case CHILD_UPDATED: {
231 final String apiId = new String(event.getData().getData());
232 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
234 if (fConsumers.containsKey(consumer)) {
235 log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
236 + " but wont hand over");
237 // Commented so that it dont give the connection
238 // until the active node is running for this client
240 dropClaimedConsumer(consumer);
245 case CHILD_REMOVED: {
246 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
248 if (fConsumers.containsKey(consumer)) {
249 log.info("Someone wanted consumer " + consumer
250 + " gone; but not removing it from the cache");
251 dropConsumer(consumer, false);
263 // initialize the ZK path
264 EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
265 ensurePath.ensure(curator.getZookeeperClient());
269 long freq = kDefault_SweepEverySeconds;
270 String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
271 kSetting_SweepEverySeconds);
272 if (null != strkSetting_SweepEverySeconds) {
273 freq = Long.parseLong(strkSetting_SweepEverySeconds);
276 fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
277 log.info("KafkaConsumerCache started");
278 log.info("sweeping cached clients every " + freq + " seconds");
279 } catch (ZkException e) {
280 log.error("@@@@@@ ZK Exception occured for " + e);
281 throw new KafkaConsumerCacheException(e);
282 } catch (Exception e) {
283 log.error("@@@@@@ Exception occured for " + e);
284 throw new KafkaConsumerCacheException(e);
289 * Getting the curator oject to start the zookeeper connection estabished
292 * @return curator object
294 public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
295 if (curator.getState() == CuratorFrameworkState.LATENT) {
299 curator.blockUntilConnected();
300 } catch (InterruptedException e) {
301 log.error("error while setting curator framework :",e);
302 Thread.currentThread().interrupt();
310 * Stop the cache service.
312 public void stopCache() {
313 setStatus(Status.DISCONNECTED);
315 final CuratorFramework curator = ConfigurationReader.getCurator();
317 if (curator != null) {
319 curator.getConnectionStateListenable().removeListener(listener);
320 curatorConsumerCache.close();
321 log.info("Curator client closed");
322 } catch (ZkInterruptedException e) {
323 log.warn("Curator client close interrupted: " + e.getMessage());
324 } catch (IOException e) {
325 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
328 curatorConsumerCache = null;
331 if (fSweepScheduler != null) {
332 fSweepScheduler.shutdownNow();
333 log.info("cache sweeper stopped");
336 if (fConsumers != null) {
341 setStatus(Status.NOT_STARTED);
343 log.info("Consumer cache service stopped");
347 * Get a cached consumer by topic, group, and id, if it exists (and remains
348 * valid) In addition, this method waits for all other consumer caches in
349 * the cluster to release their ownership and delete their version of this
353 * @param consumerGroupId
355 * @return a consumer, or null
357 public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
358 throws KafkaConsumerCacheException {
359 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
360 throw new KafkaConsumerCacheException("The cache service is unavailable.");
362 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
363 final Kafka011Consumer kc = fConsumers.get(consumerKey);
366 log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
368 fMetrics.onKafkaConsumerCacheHit();
370 log.debug("Consumer cache miss for [" + consumerKey + "]");
371 fMetrics.onKafkaConsumerCacheMiss();
378 * Get a cached consumer by topic, group, and id, if it exists (and remains
379 * valid) In addition, this method waits for all other consumer caches in
380 * the cluster to release their ownership and delete their version of this
384 * @param consumerGroupId
386 * @return a consumer, or null
388 public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
389 throws KafkaConsumerCacheException {
390 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
391 throw new KafkaConsumerCacheException("The cache service is unavailable.");
392 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
395 Enumeration<String> strEnum = fConsumers.keys();
396 String consumerLocalKey = null;
397 while (strEnum.hasMoreElements()) {
398 consumerLocalKey = strEnum.nextElement();
400 if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
405 kcl.add(fConsumers.get(consumerLocalKey));
413 public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
414 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
415 throw new KafkaConsumerCacheException("The cache service is unavailable.");
416 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
418 Enumeration<String> strEnum = fConsumers.keys();
419 String consumerLocalKey = null;
420 while (strEnum.hasMoreElements()) {
421 consumerLocalKey = strEnum.nextElement();
423 if (consumerLocalKey.startsWith(group)) {
426 kcl.add(fConsumers.get(consumerLocalKey));
435 * Put a consumer into the cache by topic, group and ID
438 * @param consumerGroupId
441 * @throws KafkaConsumerCacheException
443 public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
444 throws KafkaConsumerCacheException {
445 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
446 throw new KafkaConsumerCacheException("The cache service is unavailable.");
448 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
449 fConsumers.put(consumerKey, consumer);
453 log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
456 public Collection<? extends Consumer> getConsumers() {
457 return new LinkedList<>(fConsumers.values());
461 * This method is to drop all the consumer
463 public void dropAllConsumers() {
464 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
465 dropConsumer(entry.getKey(), true);
468 // consumers should be empty here
469 if (fConsumers.size() > 0) {
470 log.warn("During dropAllConsumers, the consumer map is not empty.");
476 * Drop a consumer from our cache due to a timeout
480 private void dropTimedOutConsumer(String key) {
481 fMetrics.onKafkaConsumerTimeout();
483 if (!fConsumers.containsKey(key)) {
484 log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
488 // First, drop this consumer from our cache
489 boolean isdrop = dropConsumer(key, true);
493 final CuratorFramework curator = ConfigurationReader.getCurator();
496 curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
497 log.info(" ^ deleted " + fBaseZkPath + "/" + key);
498 } catch (NoNodeException e) {
499 log.warn("A consumer was deleted from " + fApiId
500 + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
501 } catch (Exception e) {
502 log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
503 log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
507 int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
508 String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
509 kSetting_ConsumerHandoverWaitMs);
510 if (strkSetting_ConsumerHandoverWaitMs != null)
511 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
512 Thread.sleep(consumerHandoverWaitMs);
513 } catch (InterruptedException e) {
514 log.error("InterruptedException in dropTimedOutConsumer",e);
515 Thread.currentThread().interrupt();
517 log.info("Dropped " + key + " consumer due to timeout");
521 * Drop a consumer from our cache due to another API node claiming it as
526 private void dropClaimedConsumer(String key) {
527 // if the consumer is still in our cache, it implies a claim.
528 if (fConsumers.containsKey(key)) {
529 fMetrics.onKafkaConsumerClaimed();
530 log.info("Consumer [" + key + "] claimed by another node.");
532 log.info("^dropping claimed Kafka consumer " + key);
533 dropConsumer(key, false);
537 * Removes the consumer from the cache and closes its connection to the
541 * @param dueToTimeout
543 private boolean dropConsumer(String key, boolean dueToTimeout) {
544 final Kafka011Consumer kc = fConsumers.get(key);
545 log.info("closing Kafka consumer " + key + " object " + kc);
549 fConsumers.remove(key);
558 // private final rrNvReadable fSettings;
559 private MetricsSet fMetrics;
560 private final String fBaseZkPath;
561 private final ScheduledExecutorService fSweepScheduler;
562 private String fApiId;
564 public void setfMetrics(final MetricsSet metrics) {
565 this.fMetrics = metrics;
568 public void setfApiId(final String id) {
572 private final ConnectionStateListener listener;
574 private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
575 private PathChildrenCache curatorConsumerCache;
577 private volatile Status status;
579 private void handleReconnection() {
581 log.info("Reading current cache data from ZK and synchronizing local cache");
582 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
583 // Remove all the consumers in this API nodes cache that now belong to
585 for (ChildData cachedConsumer : cacheData) {
586 final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
587 final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
589 if (!fApiId.equals(owningApiId)) {
590 fConsumers.remove(consumerId); // Commented to avoid removing
591 // the value cache hashmap but the lock still exists.
592 // This is not considered in kafka consumer Factory
593 log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
594 + " removing " + consumerId);
598 setStatus(Status.CONNECTED);
601 private void handleConnectionSuspended() {
602 log.info("Suspending cache until ZK connection is re-established");
604 setStatus(Status.SUSPENDED);
607 private void handleConnectionLoss() {
608 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
610 setStatus(Status.DISCONNECTED);
612 closeAllCachedConsumers();
616 private void closeAllCachedConsumers() {
617 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
619 entry.getValue().close();
620 } catch (Exception e) {
621 log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
626 private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
627 return topic + "::" + consumerGroupId + "::" + clientId;
631 * This method is to get a lock
634 * @param consumerGroupId
636 * @throws KafkaConsumerCacheException
638 public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
639 throws KafkaConsumerCacheException {
640 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
641 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
643 try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
644 final String consumerPath = fBaseZkPath + "/" + consumerKey;
645 log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
646 final CuratorFramework curator = ConfigurationReader.getCurator();
649 curator.setData().forPath(consumerPath, fApiId.getBytes());
650 } catch (KeeperException.NoNodeException e) {
651 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
653 log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
655 } catch (Exception e) {
656 log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
657 throw new KafkaConsumerCacheException(e);
660 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
663 int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
664 String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
665 kSetting_ConsumerHandoverWaitMs);
666 if (strkSetting_ConsumerHandoverWaitMs != null)
667 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
668 Thread.sleep(consumerHandoverWaitMs);
669 } catch (InterruptedException e) {
670 log.error("InterruptedException in signalOwnership",e);
671 Thread.currentThread().interrupt();
675 public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
679 public void sweep() {
680 final LinkedList<String> removals = new LinkedList<String>();
681 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
682 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
683 kSetting_TouchEveryMs);
684 if (null != strkSetting_TouchEveryMs) {
685 mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
689 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
691 for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
692 final long lastTouchMs = e.getValue().getLastTouch();
693 log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs);
695 if (lastTouchMs < oldestAllowedTouchMs) {
696 log.info("consumer " + e.getKey() + " has expired");
697 removals.add(e.getKey());
701 for (String key : removals) {
702 dropTimedOutConsumer(key);
707 * Creating a thread to run the sweep method
709 * @author nilanjana.maity
712 private class sweeper implements Runnable {
722 * This method is to drop consumer
725 * @param consumerGroup
728 public void dropConsumer(String topic, String consumerGroup, String clientId) {
729 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
732 private Status getStatus() {
736 private void setStatus(Status status) {
737 this.status = status;
740 private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);