1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Enumeration;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map.Entry;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
36 import org.I0Itec.zkclient.exception.ZkException;
37 import org.I0Itec.zkclient.exception.ZkInterruptedException;
38 import org.apache.curator.framework.CuratorFramework;
39 import org.apache.curator.framework.imps.CuratorFrameworkState;
40 import org.apache.curator.framework.recipes.cache.ChildData;
41 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
42 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
43 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
44 import org.apache.curator.framework.state.ConnectionState;
45 import org.apache.curator.framework.state.ConnectionStateListener;
46 import org.apache.curator.utils.EnsurePath;
47 import org.apache.curator.utils.ZKPaths;
48 import org.apache.http.annotation.NotThreadSafe;
49 import org.apache.zookeeper.KeeperException;
50 import org.apache.zookeeper.KeeperException.NoNodeException;
51 import org.springframework.beans.factory.annotation.Autowired;
53 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
54 import org.onap.dmaap.dmf.mr.backends.Consumer;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
57 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
58 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
61 import com.att.eelf.configuration.EELFLogger;
62 import com.att.eelf.configuration.EELFManager;
63 import com.att.nsa.metrics.CdmTimer;
66 * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
72 public class KafkaConsumerCache {
74 private static KafkaConsumerCache kafkaconscache = null;
76 public static KafkaConsumerCache getInstance() {
77 if (kafkaconscache == null)
78 kafkaconscache = new KafkaConsumerCache();
80 return kafkaconscache;
83 private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
84 private static final int kDefault_ConsumerHandoverWaitMs = 500;
86 private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
87 private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
89 private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
90 private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
92 // kafka defaults to timing out a client after 6 seconds of inactivity, but
93 // it heartbeats even when the client isn't fetching. Here, we don't
94 // want to prematurely rebalance the consumer group. Assuming clients are
96 // the server at least every 30 seconds, timing out after 2 minutes should
98 // FIXME: consider allowing the client to specify its expected call rate?
99 private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
101 // check for expirations pretty regularly
102 private static final long kDefault_SweepEverySeconds = 15;
104 private enum Status {
105 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
112 private DMaaPErrorMessages errorMessages;
116 * User defined exception class for kafka consumer cache
118 * @author nilanjana.maity
121 public class KafkaConsumerCacheException extends Exception {
123 * To throw the exception
127 KafkaConsumerCacheException(Throwable t) {
135 public KafkaConsumerCacheException(String s) {
139 private static final long serialVersionUID = 1L;
143 * Creates a KafkaConsumerCache object. Before it is used, you must call
150 public KafkaConsumerCache() {
152 String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
153 kSetting_ZkBasePath);
154 if (null == strkSetting_ZkBasePath)
155 strkSetting_ZkBasePath = kDefault_ZkBasePath;
156 fBaseZkPath = strkSetting_ZkBasePath;
158 fConsumers = new ConcurrentHashMap<>();
159 fSweepScheduler = Executors.newScheduledThreadPool(1);
161 curatorConsumerCache = null;
163 status = Status.NOT_STARTED;
164 // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing
167 listener = new ConnectionStateListener() {
168 public void stateChanged(CuratorFramework client, ConnectionState newState) {
169 if (newState == ConnectionState.LOST) {
171 log.info("ZooKeeper connection expired");
172 handleConnectionLoss();
173 } else if (newState == ConnectionState.READ_ONLY) {
174 log.warn("ZooKeeper connection set to read only mode.");
175 } else if (newState == ConnectionState.RECONNECTED) {
176 log.info("ZooKeeper connection re-established");
177 handleReconnection();
178 } else if (newState == ConnectionState.SUSPENDED) {
179 log.warn("ZooKeeper connection has been suspended.");
180 handleConnectionSuspended();
187 * Start the cache service. This must be called before any get/put
193 * @throws IOException
194 * @throws KafkaConsumerCacheException
196 public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
198 if (fApiId == null) {
199 throw new IllegalArgumentException("API Node ID must be specified.");
204 if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
205 curator = getCuratorFramework(curator);
207 curator.getConnectionStateListenable().addListener(listener);
208 setStatus(Status.CONNECTED);
209 curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
210 curatorConsumerCache.start();
211 curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
212 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
213 switch (event.getType()) {
216 final String apiId = new String(event.getData().getData());
217 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
219 log.info(apiId + " started consumer " + consumer);
220 } catch (Exception ex) {
221 log.info("#Error Occured during Adding child" + ex);
225 case CHILD_UPDATED: {
226 final String apiId = new String(event.getData().getData());
227 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
229 if (fConsumers.containsKey(consumer)) {
230 log.info(apiId + " claimed consumer " + consumer + " from " + fApiId
231 + " but wont hand over");
232 // Commented so that it dont give the connection
233 // until the active node is running for this client
235 dropClaimedConsumer(consumer);
240 case CHILD_REMOVED: {
241 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
243 if (fConsumers.containsKey(consumer)) {
244 log.info("Someone wanted consumer " + consumer
245 + " gone; but not removing it from the cache");
246 dropConsumer(consumer, false);
258 // initialize the ZK path
259 EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
260 ensurePath.ensure(curator.getZookeeperClient());
264 long freq = kDefault_SweepEverySeconds;
265 String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
266 kSetting_SweepEverySeconds);
267 if (null != strkSetting_SweepEverySeconds) {
268 freq = Long.parseLong(strkSetting_SweepEverySeconds);
271 fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
272 log.info("KafkaConsumerCache started");
273 log.info("sweeping cached clients every " + freq + " seconds");
274 } catch (ZkException e) {
275 log.error("@@@@@@ ZK Exception occured for " + e);
276 throw new KafkaConsumerCacheException(e);
277 } catch (Exception e) {
278 log.error("@@@@@@ Exception occured for " + e);
279 throw new KafkaConsumerCacheException(e);
284 * Getting the curator oject to start the zookeeper connection estabished
287 * @return curator object
289 public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
290 if (curator.getState() == CuratorFrameworkState.LATENT) {
294 curator.blockUntilConnected();
295 } catch (InterruptedException e) {
296 log.error("error while setting curator framework :",e);
297 Thread.currentThread().interrupt();
305 * Stop the cache service.
307 public void stopCache() {
308 setStatus(Status.DISCONNECTED);
310 final CuratorFramework curator = ConfigurationReader.getCurator();
312 if (curator != null) {
314 curator.getConnectionStateListenable().removeListener(listener);
315 curatorConsumerCache.close();
316 log.info("Curator client closed");
317 } catch (ZkInterruptedException e) {
318 log.warn("Curator client close interrupted: ", e);
319 } catch (IOException e) {
320 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
323 curatorConsumerCache = null;
326 if (fSweepScheduler != null) {
327 fSweepScheduler.shutdownNow();
328 log.info("cache sweeper stopped");
331 if (fConsumers != null) {
336 setStatus(Status.NOT_STARTED);
338 log.info("Consumer cache service stopped");
342 * Get a cached consumer by topic, group, and id, if it exists (and remains
343 * valid) In addition, this method waits for all other consumer caches in
344 * the cluster to release their ownership and delete their version of this
348 * @param consumerGroupId
350 * @return a consumer, or null
352 public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId)
353 throws KafkaConsumerCacheException {
354 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
355 throw new KafkaConsumerCacheException("The cache service is unavailable.");
357 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
358 final Kafka011Consumer kc = fConsumers.get(consumerKey);
361 log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
363 fMetrics.onKafkaConsumerCacheHit();
365 log.debug("Consumer cache miss for [" + consumerKey + "]");
366 fMetrics.onKafkaConsumerCacheMiss();
373 * Get a cached consumer by topic, group, and id, if it exists (and remains
374 * valid) In addition, this method waits for all other consumer caches in
375 * the cluster to release their ownership and delete their version of this
379 * @param consumerGroupId
381 * @return a consumer, or null
383 public ArrayList<Kafka011Consumer> getConsumerListForCG(String topicgroup, String clientId)
384 throws KafkaConsumerCacheException {
385 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
386 throw new KafkaConsumerCacheException("The cache service is unavailable.");
387 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
390 Enumeration<String> strEnum = fConsumers.keys();
391 String consumerLocalKey = null;
392 while (strEnum.hasMoreElements()) {
393 consumerLocalKey = strEnum.nextElement();
395 if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
400 kcl.add(fConsumers.get(consumerLocalKey));
408 public ArrayList<Kafka011Consumer> getConsumerListForCG(String group) throws KafkaConsumerCacheException {
409 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
410 throw new KafkaConsumerCacheException("The cache service is unavailable.");
411 ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
413 Enumeration<String> strEnum = fConsumers.keys();
414 String consumerLocalKey = null;
415 while (strEnum.hasMoreElements()) {
416 consumerLocalKey = strEnum.nextElement();
418 if (consumerLocalKey.startsWith(group)) {
421 kcl.add(fConsumers.get(consumerLocalKey));
430 * Put a consumer into the cache by topic, group and ID
433 * @param consumerGroupId
436 * @throws KafkaConsumerCacheException
438 public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer)
439 throws KafkaConsumerCacheException {
440 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
441 throw new KafkaConsumerCacheException("The cache service is unavailable.");
443 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
444 fConsumers.put(consumerKey, consumer);
448 log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
451 public Collection<? extends Consumer> getConsumers() {
452 return new LinkedList<>(fConsumers.values());
456 * This method is to drop all the consumer
458 public void dropAllConsumers() {
459 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
460 dropConsumer(entry.getKey(), true);
463 // consumers should be empty here
464 if (fConsumers.size() > 0) {
465 log.warn("During dropAllConsumers, the consumer map is not empty.");
471 * Drop a consumer from our cache due to a timeout
475 private void dropTimedOutConsumer(String key) {
476 fMetrics.onKafkaConsumerTimeout();
478 if (!fConsumers.containsKey(key)) {
479 log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
483 // First, drop this consumer from our cache
484 boolean isdrop = dropConsumer(key, true);
488 final CuratorFramework curator = ConfigurationReader.getCurator();
491 curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
492 log.info(" ^ deleted " + fBaseZkPath + "/" + key);
493 } catch (NoNodeException e) {
494 log.warn("A consumer was deleted from " + fApiId
495 + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
496 } catch (Exception e) {
497 log.debug("Unexpected exception while deleting consumer: ", e);
498 log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
502 int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
503 String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
504 kSetting_ConsumerHandoverWaitMs);
505 if (strkSetting_ConsumerHandoverWaitMs != null)
506 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
507 Thread.sleep(consumerHandoverWaitMs);
508 } catch (InterruptedException e) {
509 log.error("InterruptedException in dropTimedOutConsumer",e);
510 Thread.currentThread().interrupt();
512 log.info("Dropped " + key + " consumer due to timeout");
516 * Drop a consumer from our cache due to another API node claiming it as
521 private void dropClaimedConsumer(String key) {
522 // if the consumer is still in our cache, it implies a claim.
523 if (fConsumers.containsKey(key)) {
524 fMetrics.onKafkaConsumerClaimed();
525 log.info("Consumer [" + key + "] claimed by another node.");
527 log.info("^dropping claimed Kafka consumer " + key);
528 dropConsumer(key, false);
532 * Removes the consumer from the cache and closes its connection to the
536 * @param dueToTimeout
538 private boolean dropConsumer(String key, boolean dueToTimeout) {
539 final Kafka011Consumer kc = fConsumers.get(key);
540 log.info("closing Kafka consumer " + key + " object " + kc);
544 fConsumers.remove(key);
553 // private final rrNvReadable fSettings;
554 private MetricsSet fMetrics;
555 private final String fBaseZkPath;
556 private final ScheduledExecutorService fSweepScheduler;
557 private String fApiId;
559 public void setfMetrics(final MetricsSet metrics) {
560 this.fMetrics = metrics;
563 public void setfApiId(final String id) {
567 private final ConnectionStateListener listener;
569 private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
570 private PathChildrenCache curatorConsumerCache;
572 private volatile Status status;
574 private void handleReconnection() {
576 log.info("Reading current cache data from ZK and synchronizing local cache");
577 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
578 // Remove all the consumers in this API nodes cache that now belong to
580 for (ChildData cachedConsumer : cacheData) {
581 final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
582 final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
584 if (!fApiId.equals(owningApiId)) {
585 fConsumers.remove(consumerId); // Commented to avoid removing
586 // the value cache hashmap but the lock still exists.
587 // This is not considered in kafka consumer Factory
588 log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId
589 + " removing " + consumerId);
593 setStatus(Status.CONNECTED);
596 private void handleConnectionSuspended() {
597 log.info("Suspending cache until ZK connection is re-established");
599 setStatus(Status.SUSPENDED);
602 private void handleConnectionLoss() {
603 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
605 setStatus(Status.DISCONNECTED);
607 closeAllCachedConsumers();
611 private void closeAllCachedConsumers() {
612 for (Entry<String, Kafka011Consumer> entry : fConsumers.entrySet()) {
614 entry.getValue().close();
615 } catch (Exception e) {
616 log.info("@@@@@@ Error occurd while closing Clearing All cache " + e);
621 private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
622 return topic + "::" + consumerGroupId + "::" + clientId;
626 * This method is to get a lock
629 * @param consumerGroupId
631 * @throws KafkaConsumerCacheException
633 public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
634 throws KafkaConsumerCacheException {
635 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
636 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
638 try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
639 final String consumerPath = fBaseZkPath + "/" + consumerKey;
640 log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
641 final CuratorFramework curator = ConfigurationReader.getCurator();
644 curator.setData().forPath(consumerPath, fApiId.getBytes());
645 } catch (KeeperException.NoNodeException e) {
646 log.info("KeeperException.NoNodeException occured", e);
647 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
649 log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
651 } catch (Exception e) {
652 log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
653 throw new KafkaConsumerCacheException(e);
656 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
659 int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
660 String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
661 kSetting_ConsumerHandoverWaitMs);
662 if (strkSetting_ConsumerHandoverWaitMs != null)
663 consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
664 Thread.sleep(consumerHandoverWaitMs);
665 } catch (InterruptedException e) {
666 log.error("InterruptedException in signalOwnership",e);
667 //Thread.currentThread().interrupt();
671 public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() {
675 public void sweep() {
676 final LinkedList<String> removals = new LinkedList<String>();
677 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
678 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
679 kSetting_TouchEveryMs);
680 if (null != strkSetting_TouchEveryMs) {
681 mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
685 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
687 for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
688 final long lastTouchMs = e.getValue().getLastTouch();
689 log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs);
691 if (lastTouchMs < oldestAllowedTouchMs) {
692 log.info("consumer " + e.getKey() + " has expired");
693 removals.add(e.getKey());
697 for (String key : removals) {
698 dropTimedOutConsumer(key);
703 * Creating a thread to run the sweep method
705 * @author nilanjana.maity
708 private class sweeper implements Runnable {
718 * This method is to drop consumer
721 * @param consumerGroup
724 public void dropConsumer(String topic, String consumerGroup, String clientId) {
725 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
728 private Status getStatus() {
732 private void setStatus(Status status) {
733 this.status = status;
736 private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);