1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.kafka;
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
34 import org.I0Itec.zkclient.exception.ZkException;
35 import org.I0Itec.zkclient.exception.ZkInterruptedException;
36 import org.apache.curator.framework.CuratorFramework;
37 import org.apache.curator.framework.imps.CuratorFrameworkState;
38 import org.apache.curator.framework.recipes.cache.ChildData;
39 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
40 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
41 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
42 import org.apache.curator.framework.state.ConnectionState;
43 import org.apache.curator.framework.state.ConnectionStateListener;
44 import org.apache.curator.utils.EnsurePath;
45 import org.apache.curator.utils.ZKPaths;
46 import org.apache.http.annotation.NotThreadSafe;
47 import org.apache.zookeeper.KeeperException;
48 import org.apache.zookeeper.KeeperException.NoNodeException;
49 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer;
50 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.MetricsSet;
51 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.constants.CambriaConstants;
52 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.ConfigurationReader;
54 //import org.slf4j.Logger;
55 //import org.slf4j.LoggerFactory;
56 import com.att.eelf.configuration.EELFLogger;
57 import com.att.eelf.configuration.EELFManager;
58 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
59 import com.att.nsa.drumlin.till.nv.rrNvReadable;
62 * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which
68 public class KafkaConsumerCache {
70 private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs";
71 private static final int kDefault_ConsumerHandoverWaitMs = 500;
73 private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds";
74 private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs";
76 private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath";
77 private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache";
79 // kafka defaults to timing out a client after 6 seconds of inactivity, but
80 // it heartbeats even when the client isn't fetching. Here, we don't
81 // want to prematurely rebalance the consumer group. Assuming clients are
83 // the server at least every 30 seconds, timing out after 2 minutes should
85 // FIXME: consider allowing the client to specify its expected call rate?
86 private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
88 // check for expirations pretty regularly
89 private static final long kDefault_SweepEverySeconds = 15;
92 NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
96 * User defined exception class for kafka consumer cache
101 public class KafkaConsumerCacheException extends Exception {
103 * To throw the exception
107 KafkaConsumerCacheException(Throwable t) {
115 public KafkaConsumerCacheException(String s) {
119 private static final long serialVersionUID = 1L;
123 * Creates a KafkaConsumerCache object. Before it is used, you must call
130 public KafkaConsumerCache(String apiId, MetricsSet metrics) {
133 throw new IllegalArgumentException("API Node ID must be specified.");
139 String strkSetting_ZkBasePath= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ZkBasePath);
140 if(null==strkSetting_ZkBasePath)strkSetting_ZkBasePath = kDefault_ZkBasePath;
141 fBaseZkPath = strkSetting_ZkBasePath;
143 fConsumers = new ConcurrentHashMap<String, KafkaConsumer>();
144 fSweepScheduler = Executors.newScheduledThreadPool(1);
146 curatorConsumerCache = null;
148 status = Status.NOT_STARTED;
150 listener = new ConnectionStateListener() {
151 public void stateChanged(CuratorFramework client, ConnectionState newState) {
152 if (newState == ConnectionState.LOST) {
153 log.info("ZooKeeper connection expired");
154 handleConnectionLoss();
155 } else if (newState == ConnectionState.READ_ONLY) {
156 log.warn("ZooKeeper connection set to read only mode.");
157 } else if (newState == ConnectionState.RECONNECTED) {
158 log.info("ZooKeeper connection re-established");
159 handleReconnection();
160 } else if (newState == ConnectionState.SUSPENDED) {
161 log.warn("ZooKeeper connection has been suspended.");
162 handleConnectionSuspended();
169 * Start the cache service. This must be called before any get/put
175 * @throws IOException
176 * @throws KafkaConsumerCacheException
178 public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException {
181 // CuratorFramework curator = null;
183 // Changed the class from where we are initializing the curator
185 if (mode != null && mode.equals(CambriaConstants.CAMBRIA)) {
186 curator = ConfigurationReader.getCurator();
187 } else if (mode != null && mode.equals(CambriaConstants.DMAAP)) {
188 curator = getCuratorFramework(curator);
191 curator.getConnectionStateListenable().addListener(listener);
193 setStatus(Status.CONNECTED);
195 curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true);
196 curatorConsumerCache.start();
198 curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() {
199 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
200 switch (event.getType()) {
202 final String apiId = new String(event.getData().getData());
203 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
205 log.info(apiId + " started consumer " + consumer);
208 case CHILD_UPDATED: {
209 final String apiId = new String(event.getData().getData());
210 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
212 if (fConsumers.containsKey(consumer)) {
213 log.info(apiId + " claimed consumer " + consumer + " from " + fApiId);
215 dropClaimedConsumer(consumer);
220 case CHILD_REMOVED: {
221 final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath());
223 if (fConsumers.containsKey(consumer)) {
224 log.info("Someone wanted consumer " + consumer + " gone; removing it from the cache");
225 dropConsumer(consumer, false);
236 // initialize the ZK path
237 EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
238 ensurePath.ensure(curator.getZookeeperClient());
240 //final long freq = fSettings.getLong(kSetting_SweepEverySeconds, kDefault_SweepEverySeconds);
241 long freq = kDefault_SweepEverySeconds;
242 String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_SweepEverySeconds);
243 if(null==strkSetting_SweepEverySeconds) strkSetting_SweepEverySeconds = kDefault_SweepEverySeconds+"";
245 freq = Long.parseLong(strkSetting_SweepEverySeconds);
247 fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS);
248 log.info("KafkaConsumerCache started");
249 log.info("sweeping cached clients every " + freq + " seconds");
250 } catch (ZkException e) {
251 throw new KafkaConsumerCacheException(e);
252 } catch (Exception e) {
253 throw new KafkaConsumerCacheException(e);
258 * Getting the curator oject to start the zookeeper connection estabished
261 * @return curator object
263 public static CuratorFramework getCuratorFramework(CuratorFramework curator) {
264 if (curator.getState() == CuratorFrameworkState.LATENT) {
268 curator.blockUntilConnected();
269 } catch (InterruptedException e) {
271 log.error("error while setting curator framework :" + e.getMessage());
279 * Stop the cache service.
281 public void stopCache() {
282 setStatus(Status.DISCONNECTED);
284 final CuratorFramework curator = ConfigurationReader.getCurator();
286 if (curator != null) {
288 curator.getConnectionStateListenable().removeListener(listener);
289 curatorConsumerCache.close();
290 log.info("Curator client closed");
291 } catch (ZkInterruptedException e) {
292 log.warn("Curator client close interrupted: " + e.getMessage());
293 } catch (IOException e) {
294 log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
297 curatorConsumerCache = null;
300 if (fSweepScheduler != null) {
301 fSweepScheduler.shutdownNow();
302 log.info("cache sweeper stopped");
305 if (fConsumers != null) {
310 setStatus(Status.NOT_STARTED);
312 log.info("Consumer cache service stopped");
316 * Get a cached consumer by topic, group, and id, if it exists (and remains
317 * valid) In addition, this method waits for all other consumer caches in
318 * the cluster to release their ownership and delete their version of this
322 * @param consumerGroupId
324 * @return a consumer, or null
326 public KafkaConsumer getConsumerFor(String topic, String consumerGroupId, String clientId)
327 throws KafkaConsumerCacheException {
328 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
329 throw new KafkaConsumerCacheException("The cache service is unavailable.");
331 final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId);
332 final KafkaConsumer kc = fConsumers.get(consumerKey);
335 log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch());
337 fMetrics.onKafkaConsumerCacheHit();
339 log.debug("Consumer cache miss for [" + consumerKey + "]");
340 fMetrics.onKafkaConsumerCacheMiss();
347 * Put a consumer into the cache by topic, group and ID
350 * @param consumerGroupId
353 * @throws KafkaConsumerCacheException
355 public void putConsumerFor(String topic, String consumerGroupId, String consumerId, KafkaConsumer consumer)
356 throws KafkaConsumerCacheException {
357 if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
358 throw new KafkaConsumerCacheException("The cache service is unavailable.");
360 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
361 fConsumers.put(consumerKey, consumer);
364 public Collection<? extends Consumer> getConsumers() {
365 return new LinkedList<KafkaConsumer>(fConsumers.values());
369 * This method is to drop all the consumer
371 public void dropAllConsumers() {
372 for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
373 dropConsumer(entry.getKey(), true);
376 // consumers should be empty here
377 if (fConsumers.size() > 0) {
378 log.warn("During dropAllConsumers, the consumer map is not empty.");
384 * Drop a consumer from our cache due to a timeout
388 private void dropTimedOutConsumer(String key) {
389 fMetrics.onKafkaConsumerTimeout();
391 if (!fConsumers.containsKey(key)) {
392 log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key);
396 // First, drop this consumer from our cache
397 dropConsumer(key, true);
399 final CuratorFramework curator = ConfigurationReader.getCurator();
402 curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key);
403 } catch (NoNodeException e) {
404 log.warn("A consumer was deleted from " + fApiId
405 + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
406 } catch (Exception e) {
407 log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
410 log.info("Dropped " + key + " consumer due to timeout");
414 * Drop a consumer from our cache due to another API node claiming it as
419 private void dropClaimedConsumer(String key) {
420 // if the consumer is still in our cache, it implies a claim.
421 if (fConsumers.containsKey(key)) {
422 fMetrics.onKafkaConsumerClaimed();
423 log.info("Consumer [" + key + "] claimed by another node.");
426 dropConsumer(key, false);
430 * Removes the consumer from the cache and closes its connection to the
434 * @param dueToTimeout
436 private void dropConsumer(String key, boolean dueToTimeout) {
437 final KafkaConsumer kc = fConsumers.remove(key);
440 log.info("closing Kafka consumer " + key);
445 // private final rrNvReadable fSettings;
446 private final MetricsSet fMetrics;
447 private final String fBaseZkPath;
448 private final ScheduledExecutorService fSweepScheduler;
449 private final String fApiId;
450 private final ConnectionStateListener listener;
452 private ConcurrentHashMap<String, KafkaConsumer> fConsumers;
453 private PathChildrenCache curatorConsumerCache;
455 private volatile Status status;
457 private void handleReconnection() {
459 log.info("Reading current cache data from ZK and synchronizing local cache");
461 final List<ChildData> cacheData = curatorConsumerCache.getCurrentData();
463 // Remove all the consumers in this API nodes cache that now belong to
465 for (ChildData cachedConsumer : cacheData) {
466 final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath());
467 final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData())
470 if (!fApiId.equals(owningApiId)) {
471 fConsumers.remove(consumerId);
475 setStatus(Status.CONNECTED);
478 private void handleConnectionSuspended() {
479 log.info("Suspending cache until ZK connection is re-established");
481 setStatus(Status.SUSPENDED);
484 private void handleConnectionLoss() {
485 log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)");
487 setStatus(Status.DISCONNECTED);
489 closeAllCachedConsumers();
493 private void closeAllCachedConsumers() {
494 for (Entry<String, KafkaConsumer> entry : fConsumers.entrySet()) {
495 entry.getValue().close();
499 private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) {
500 return topic + "::" + consumerGroupId + "::" + clientId;
504 * This method is to get a lock
507 * @param consumerGroupId
509 * @throws KafkaConsumerCacheException
511 public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId)
512 throws KafkaConsumerCacheException {
513 // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
514 final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
517 final String consumerPath = fBaseZkPath + "/" + consumerKey;
519 log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
521 final CuratorFramework curator = ConfigurationReader.getCurator();
524 curator.setData().forPath(consumerPath, fApiId.getBytes());
525 } catch (KeeperException.NoNodeException e) {
526 curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
529 log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
530 } catch (Exception e) {
531 log.error(fApiId + " failed to claim ownership of consumer " + consumerKey);
532 throw new KafkaConsumerCacheException(e);
535 log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer");
538 int kSetting_ConsumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs;
539 String strkSetting_ConsumerHandoverWaitMs= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_ConsumerHandoverWaitMs+"");
540 if(strkSetting_ConsumerHandoverWaitMs!=null) kSetting_ConsumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
542 Thread.sleep(kSetting_ConsumerHandoverWaitMs);
543 //Thread.sleep(fSettings.getInt(kSetting_ConsumerHandoverWaitMs, kDefault_ConsumerHandoverWaitMs));
544 } catch (InterruptedException e) {
549 private void sweep() {
550 final LinkedList<String> removals = new LinkedList<String>();
551 long mustTouchEveryMs = kDefault_MustTouchEveryMs;
552 String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_TouchEveryMs);
553 //if(null!=strkSetting_TouchEveryMs) strkSetting_TouchEveryMs = kDefault_MustTouchEveryMs+"";
554 if(null!=strkSetting_TouchEveryMs)
556 mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
559 //final long mustTouchEveryMs = fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
560 final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
562 for (Entry<String, KafkaConsumer> e : fConsumers.entrySet()) {
563 final long lastTouchMs = e.getValue().getLastTouch();
565 log.debug("consumer " + e.getKey() + " last touched at " + lastTouchMs);
567 if (lastTouchMs < oldestAllowedTouchMs) {
568 log.info("consumer " + e.getKey() + " has expired");
569 removals.add(e.getKey());
573 for (String key : removals) {
574 dropTimedOutConsumer(key);
579 * Creating a thread to run the sweep method
584 private class sweeper implements Runnable {
594 * This method is to drop consumer
597 * @param consumerGroup
600 public void dropConsumer(String topic, String consumerGroup, String clientId) {
601 dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false);
604 private Status getStatus() {
608 private void setStatus(Status status) {
609 this.status = status;
612 private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
613 //private static final Logger log = LoggerFactory.getLogger(KafkaConsumerCache.class);