1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import com.att.aft.dme2.internal.springframework.beans.factory.annotation.Autowired;
25 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import com.att.nsa.metrics.CdmTimer;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Enumeration;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Map.Entry;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.TimeUnit;
40 import org.I0Itec.zkclient.exception.ZkException;
41 import org.I0Itec.zkclient.exception.ZkInterruptedException;
42 import org.apache.curator.framework.CuratorFramework;
43 import org.apache.curator.framework.imps.CuratorFrameworkState;
44 import org.apache.curator.framework.recipes.cache.ChildData;
45 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
46 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
47 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
48 import org.apache.curator.framework.state.ConnectionState;
49 import org.apache.curator.framework.state.ConnectionStateListener;
50 import org.apache.curator.utils.EnsurePath;
51 import org.apache.curator.utils.ZKPaths;
52 import org.apache.http.annotation.NotThreadSafe;
53 import org.apache.zookeeper.KeeperException.NoNodeException;
54 import org.onap.dmaap.dmf.mr.backends.Consumer;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
57 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
58 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
61 * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
67 public class KafkaConsumerCache {
69 private static KafkaConsumerCache kafkaconscache = null;
71 public static KafkaConsumerCache getInstance() {
72 if (kafkaconscache == null)
73 kafkaconscache = new KafkaConsumerCache();
75 return kafkaconscache;
78 private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
79 private static final int kDefault_ConsumerHandoverWaitMs = 500;
81 private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
82 private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
84 private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
85 private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
87 // kafka defaults to timing out a client after 6 seconds of inactivity, but
88 // it heartbeats even when the client isn't fetching. Here, we don't
89 // want to prematurely rebalance the consumer group. Assuming clients are
91 // the server at least every 30 seconds, timing out after 2 minutes should
93 // FIXME: consider allowing the client to specify its expected call rate?
94 private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
96 // check for expirations pretty regularly
97 private static final long kDefault_SweepEverySeconds = 15;
100 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
107 private DMaaPErrorMessages errorMessages;
111 * User defined exception class for kafka consumer cache
113 * @author nilanjana.maity
116 public class KafkaConsumerCacheException extends Exception {
118 * To throw the exception
122 KafkaConsumerCacheException(Throwable t) {
130 public KafkaConsumerCacheException(String s) {
134 private static final long serialVersionUID = 1L;
138 * Creates a KafkaConsumerCache object. Before it is used, you must call
142 public KafkaConsumerCache() {
144 String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
145 kSetting_ZkBasePath);
146 if (null == strkSetting_ZkBasePath)
147 strkSetting_ZkBasePath = kDefault_ZkBasePath;
148 fBaseZkPath = strkSetting_ZkBasePath;
150 fConsumers = new ConcurrentHashMap<>();
151 fSweepScheduler = Executors.newScheduledThreadPool(1);
153 curatorConsumerCache = null;
155 status = Status.NOT_STARTED;
156 // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
159 listener = new ConnectionStateListener() {
160 public void stateChanged(CuratorFramework client, ConnectionState newState) {
161 if (newState == ConnectionState.LOST) {
163 log.info("ZooKeeper connection expired");
164 handleConnectionLoss();
165 } else if (newState == ConnectionState.READ_ONLY) {
166 log.warn("ZooKeeper connection set to read only mode.");
167 } else if (newState == ConnectionState.RECONNECTED) {
168 log.info("ZooKeeper connection re-established");
169 handleReconnection();
170 } else if (newState == ConnectionState.SUSPENDED) {
171 log.warn("ZooKeeper connection has been suspended.");
172 handleConnectionSuspended();
179 * Start the cache service. This must be called before any get/put
185 * @throws IOException
186 * @throws KafkaConsumerCacheException
188 public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
190 if (fApiId == null) {
191 throw new IllegalArgumentException("API Node ID must be specified.");
196 if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
197 curator = getCuratorFramework(curator);
199 curator.getConnectionStateListenable().addListener(listener);
200 setStatus(Status.CONNECTED);
201 curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
202 curatorConsumerCache.start();
203 curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
204 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
205 switch (event.getType()) {
208 final String apiId = new String(event.getData().getData());
209 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
211 log.info(apiId + " started consumer " + consumer);
212 } catch (Exception ex) {
213 log.info("#Error Occured during Adding child" + ex);
217 case CHILD_UPDATED: {
218 final String apiId = new String(event.getData().getData());
219 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
221 if (fConsumers.containsKey(consumer)) {
222 log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
223 + " but wont hand over");
224 // Commented so that it dont give the connection
225 // until the active node is running for this client
227 dropClaimedConsumer(consumer);
232 case CHILD_REMOVED: {
233 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
235 if (fConsumers.containsKey(consumer)) {
236 log.info("Someone wanted consumer " + consumer
237 + " gone; but not removing it from the cache");
238 dropConsumer(consumer, false);
250 // initialize the ZK path
251 EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
252 ensurePath.ensure(curator.getZookeeperClient());
256 long freq = kDefault_SweepEverySeconds;
257 String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
258 kSetting_SweepEverySeconds);
259 if (null != strkSetting_SweepEverySeconds) {
260 freq = Long.parseLong(strkSetting_SweepEverySeconds);
263 fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
264 log.info("KafkaConsumerCache started");
265 log.info("sweeping cached clients every " + freq + " seconds");
266 } catch (ZkException e) {
267 log.error("@@@@@@ ZK Exception occured for " + e);
268 throw new KafkaConsumerCacheException(e);
269 } catch (Exception e) {
270 log.error("@@@@@@ Exception occured for " + e);
271 throw new KafkaConsumerCacheException(e);
276 * Getting the curator oject to start the zookeeper connection estabished
279 * @return curator object
281 public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
282 if (curator.getState() == CuratorFrameworkState.LATENT) {
286 curator.blockUntilConnected();
287 } catch (InterruptedException e) {
288 log.error("error while setting curator framework :",e);
289 Thread.currentThread().interrupt();
297 * Stop the cache service.
299 public void stopCache() {
300 setStatus(Status.DISCONNECTED);
302 final CuratorFramework curator = ConfigurationReader.getCurator();
304 if (curator != null) {
306 curator.getConnectionStateListenable().removeListener(listener);
307 curatorConsumerCache.close();
308 log.info("Curator client closed");
309 } catch (ZkInterruptedException e) {
310 log.warn("Curator client close interrupted: ", e);
311 } catch (IOException e) {
312 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
315 curatorConsumerCache = null;
318 if (fSweepScheduler != null) {
319 fSweepScheduler.shutdownNow();
320 log.info("cache sweeper stopped");
323 if (fConsumers != null) {
328 setStatus(Status.NOT_STARTED);
330 log.info("Consumer cache service stopped");
334 * Get a cached consumer by topic, group, and id, if it exists (and remains
335 * valid) In addition, this method waits for all other consumer caches in
336 * the cluster to release their ownership and delete their version of this
340 * @param consumerGroupId
342 * @return a consumer, or null
344 public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
345 throws KafkaConsumerCacheException {
346 if (getStatus() != Status.CONNECTED)
347 throw new KafkaConsumerCacheException("The cache service is unavailable.");
349 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
350 final Kafka011Consumer kc = fConsumers.get(consumerKey);
353 log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
355 fMetrics.onKafkaConsumerCacheHit();
357 log.debug("Consumer cache miss for [" + consumerKey + "]");
358 fMetrics.onKafkaConsumerCacheMiss();
365 * Get a cached consumer by topic, group, and id, if it exists (and remains
366 * valid) In addition, this method waits for all other consumer caches in
367 * the cluster to release their ownership and delete their version of this
372 * @return a consumer, or null
374 public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
375 throws KafkaConsumerCacheException {
376 if (getStatus() != Status.CONNECTED)
377 throw new KafkaConsumerCacheException("The cache service is unavailable.");
378 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
381 Enumeration<String> strEnum = fConsumers.keys();
382 String consumerLocalKey = null;
383 while (strEnum.hasMoreElements()) {
384 consumerLocalKey = strEnum.nextElement();
386 if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
391 kcl.add(fConsumers.get(consumerLocalKey));
399 public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
400 if (getStatus() != Status.CONNECTED)
401 throw new KafkaConsumerCacheException("The cache service is unavailable.");
402 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
404 Enumeration<String> strEnum = fConsumers.keys();
405 String consumerLocalKey = null;
406 while (strEnum.hasMoreElements()) {
407 consumerLocalKey = strEnum.nextElement();
409 if (consumerLocalKey.startsWith(group)) {
412 kcl.add(fConsumers.get(consumerLocalKey));
421 * Put a consumer into the cache by topic, group and ID
424 * @param consumerGroupId
427 * @throws KafkaConsumerCacheException
429 public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
430 throws KafkaConsumerCacheException {
431 if (getStatus() != Status.CONNECTED)
432 throw new KafkaConsumerCacheException("The cache service is unavailable.");
434 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
435 fConsumers.put(consumerKey, consumer);
439 log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
442 public Collection<? extends Consumer> getConsumers() {
443 return new LinkedList<>(fConsumers.values());
447 * This method is to drop all the consumer
449 public void dropAllConsumers() {
450 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
451 dropConsumer(entry.getKey(), true);
454 // consumers should be empty here
455 if (fConsumers.size() > 0) {
456 log.warn("During dropAllConsumers, the consumer map is not empty.");
462 * Drop a consumer from our cache due to a timeout
466 private void dropTimedOutConsumer(String key) {
467 fMetrics.onKafkaConsumerTimeout();
469 if (!fConsumers.containsKey(key)) {
470 log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
474 // First, drop this consumer from our cache
475 boolean isdrop = dropConsumer(key, true);
479 final CuratorFramework curator = ConfigurationReader.getCurator();
482 curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
483 log.info(" ^ deleted " + fBaseZkPath + "/" + key);
484 } catch (NoNodeException e) {
485 log.warn("A consumer was deleted from " + fApiId
486 + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
487 } catch (Exception e) {
488 log.debug("Unexpected exception while deleting consumer: ", e);
489 log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
493 int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
494 String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
495 kSetting_ConsumerHandoverWaitMs);
496 if (strkSetting_ConsumerHandoverWaitMs != null)
497 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
498 Thread.sleep(consumerHandoverWaitMs);
499 } catch (InterruptedException e) {
500 log.error("InterruptedException in dropTimedOutConsumer",e);
501 Thread.currentThread().interrupt();
503 log.info("Dropped " + key + " consumer due to timeout");
507 * Drop a consumer from our cache due to another API node claiming it as
512 private void dropClaimedConsumer(String key) {
513 // if the consumer is still in our cache, it implies a claim.
514 if (fConsumers.containsKey(key)) {
515 fMetrics.onKafkaConsumerClaimed();
516 log.info("Consumer [" + key + "] claimed by another node.");
518 log.info("^dropping claimed Kafka consumer " + key);
519 dropConsumer(key, false);
523 * Removes the consumer from the cache and closes its connection to the
527 * @param dueToTimeout
529 private boolean dropConsumer(String key, boolean dueToTimeout) {
530 final Kafka011Consumer kc = fConsumers.get(key);
531 log.info("closing Kafka consumer " + key + " object " + kc);
535 fConsumers.remove(key);
544 // private final rrNvReadable fSettings;
545 private MetricsSet fMetrics;
546 private final String fBaseZkPath;
547 private final ScheduledExecutorService fSweepScheduler;
548 private String fApiId;
550 public void setfMetrics(final MetricsSet metrics) {
551 this.fMetrics = metrics;
554 public void setfApiId(final String id) {
558 private final ConnectionStateListener listener;
560 private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
561 private PathChildrenCache curatorConsumerCache;
563 private volatile Status status;
565 private void handleReconnection() {
567 log.info("Reading current cache data from ZK and synchronizing local cache");
568 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
569 // Remove all the consumers in this API nodes cache that now belong to
571 for (ChildData cachedConsumer : cacheData) {
572 final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
573 final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
575 if (!fApiId.equals(owningApiId)) {
576 fConsumers.remove(consumerId); // Commented to avoid removing
577 // the value cache hashmap but the lock still exists.
578 // This is not considered in kafka consumer Factory
579 log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
580 + " removing " + consumerId);
584 setStatus(Status.CONNECTED);
587 private void handleConnectionSuspended() {
588 log.info("Suspending cache until ZK connection is re-established");
590 setStatus(Status.SUSPENDED);
593 private void handleConnectionLoss() {
594 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
596 setStatus(Status.DISCONNECTED);
598 closeAllCachedConsumers();
602 private void closeAllCachedConsumers() {
603 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
605 entry.getValue().close();
606 } catch (Exception e) {
607 log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
612 private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
613 return topic + "::" + consumerGroupId + "::" + clientId;
617 * This method is to get a lock
620 * @param consumerGroupId
622 * @throws KafkaConsumerCacheException
624 public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
625 throws KafkaConsumerCacheException {
626 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
627 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
629 try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
630 final String consumerPath = fBaseZkPath + "/" + consumerKey;
631 log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
632 final CuratorFramework curator = ConfigurationReader.getCurator();
635 curator.setData().forPath(consumerPath, fApiId.getBytes());
636 } catch (NoNodeException e) {
637 log.info("KeeperException.NoNodeException occured", e);
638 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
640 log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
642 } catch (Exception e) {
643 log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
644 throw new KafkaConsumerCacheException(e);
647 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
650 int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
651 String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
652 kSetting_ConsumerHandoverWaitMs);
653 if (strkSetting_ConsumerHandoverWaitMs != null)
654 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
655 Thread.sleep(consumerHandoverWaitMs);
656 } catch (InterruptedException e) {
657 log.error("InterruptedException in signalOwnership",e);
658 //Thread.currentThread().interrupt();
662 public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
666 public void sweep() {
667 final LinkedList<String> removals = new LinkedList<String>();
668 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
669 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
670 kSetting_TouchEveryMs);
671 if (null != strkSetting_TouchEveryMs) {
672 mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
676 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
678 for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
679 final long lastTouchMs = e.getValue().getLastTouch();
680 log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs);
682 if (lastTouchMs < oldestAllowedTouchMs) {
683 log.info("consumer " + e.getKey() + " has expired");
684 removals.add(e.getKey());
688 for (String key : removals) {
689 dropTimedOutConsumer(key);
694 * Creating a thread to run the sweep method
696 * @author nilanjana.maity
699 private class sweeper implements Runnable {
709 * This method is to drop consumer
712 * @param consumerGroup
715 public void dropConsumer(String topic, String consumerGroup, String clientId) {
716 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
719 private Status getStatus() {
723 private void setStatus(Status status) {
724 this.status = status;
727 private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);