1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.beans;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Properties;
29 import java.util.concurrent.TimeUnit;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
33 import org.apache.kafka.clients.consumer.KafkaConsumer;
34 import org.springframework.beans.factory.annotation.Qualifier;
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import com.att.dmf.mr.CambriaApiException;
38 import com.att.dmf.mr.backends.Consumer;
39 import com.att.dmf.mr.backends.ConsumerFactory;
40 import com.att.dmf.mr.backends.MetricsSet;
41 import com.att.dmf.mr.backends.kafka.Kafka011Consumer;
42 import com.att.dmf.mr.backends.kafka.Kafka011ConsumerUtil;
43 import com.att.dmf.mr.backends.kafka.KafkaConsumerCache;
44 import com.att.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
45 import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
46 import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
47 import com.att.dmf.mr.constants.CambriaConstants;
48 import com.att.dmf.mr.utils.ConfigurationReader;
49 //import org.slf4j.Logger;
50 //import org.slf4j.LoggerFactory;
51 import com.att.eelf.configuration.EELFLogger;
52 import com.att.eelf.configuration.EELFManager;
53 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
56 * @author nilanjana.maity
59 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
61 // private static final Logger log = LoggerFactory
62 // .getLogger(DMaaPKafkaConsumerFactory.class);
63 private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
65 // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
66 // KafkaLiveLockAvoider();
69 * constructor initialization
74 * @throws missingReqdSetting
75 * @throws KafkaConsumerCacheException
76 * @throws UnknownHostException
79 public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
80 @Qualifier("curator") CuratorFramework curator,
81 @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
82 throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
84 String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
85 CambriaConstants.kSetting_ApiNodeIdentifier);
86 if (apiNodeId == null) {
88 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
91 log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
92 final String mode = CambriaConstants.DMAAP;
94 fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
95 "kafka.metadata.broker.list");
96 if (null == fkafkaBrokers) {
98 fkafkaBrokers = "localhost:9092";
101 boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
102 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
103 "cambria.consumer.cache.enabled");
104 if (null != strkSetting_EnableCache)
105 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
107 final boolean isCacheEnabled = kSetting_EnableCache;
109 // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
112 if (isCacheEnabled) {
113 fCache = KafkaConsumerCache.getInstance();
116 if (fCache != null) {
117 fCache.setfMetrics(metrics);
118 fCache.setfApiId(apiNodeId);
119 fCache.startCache(mode, curator);
120 if(kafkaLiveLockAvoider!=null){
121 kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
122 fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
131 * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
132 * java.lang.String, java.lang.String, int, java.lang.String) This method is
133 * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
134 * either from kafkaconsumer cache or create a new connection This also get
135 * the list of other consumer objects for the same consumer group and set to
136 * KafkaConsumer object. This list may be used during poll-rebalancing
140 public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
141 String remotehost) throws UnavailableException, CambriaApiException {
144 // To synchronize based on the consumer group.
146 Object syncObject = synchash.get(topic + consumerGroupName);
147 if (null == syncObject) {
148 syncObject = new Object();
149 synchash.put(topic + consumerGroupName, syncObject);
152 synchronized (syncObject) {
154 kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
156 } catch (KafkaConsumerCacheException e) {
157 log.info("######@@@@### Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
158 + "::" + consumerId);
159 log.error("####@@@@## Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
160 + "::" + consumerId);
161 throw new UnavailableException(e);
164 // Ideally if cache exists below flow should be skipped. If cache
166 // exist, then create this first time on this node.
169 log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
172 final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
173 "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
174 boolean locked = false;
178 locked = ipLock.acquire(30, TimeUnit.SECONDS);
181 log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
182 + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
183 throw new UnavailableException(
184 "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
185 + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
188 // ConfigurationReader.getCurator().checkExists().forPath("S").
190 log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
191 + "], on topic [" + topic + "].");
193 fCache.signalOwnership(topic, consumerGroupName, consumerId);
195 final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
196 long fCreateTimeMs = System.currentTimeMillis();
197 KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
198 kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
200 log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
202 if (fCache != null) {
203 fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
206 } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
208 "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
209 throw new UnavailableException("Couldn't connect to ZK.");
210 } catch (KafkaConsumerCacheException e) {
211 log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
212 + " " + consumerGroupName + "/" + consumerId);
213 } catch (UnavailableException u) {
214 log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
216 throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
217 } catch (Exception e) {
218 log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
220 log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
227 } catch (Exception e) {
228 throw new UnavailableException("Error while releasing consumer factory lock" + e, e);
238 public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
239 if (fCache != null) {
240 fCache.dropConsumer(topic, consumerGroup, clientId);
245 public synchronized Collection<? extends Consumer> getConsumers() {
246 return fCache.getConsumers();
250 public synchronized void dropCache() {
251 fCache.dropAllConsumers();
255 private KafkaConsumerCache fCache;
256 private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
257 private String fkafkaBrokers;
261 private static String makeLongKey(String key, String prefix) {
262 return prefix + "." + key;
265 private void transferSettingIfProvided(Properties target, String key, String prefix) {
266 String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
268 // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
269 if (null != keyVal) {
270 // final String val = fSettings
271 // .getString(makeLongKey(key, prefix), "");
272 log.info("Setting [" + key + "] to " + keyVal + ".");
273 target.put(key, keyVal);
278 * Name CreateConsumerconfig
284 * This method is to create Properties required to create kafka connection
285 * Group name is replaced with different format groupid--topic to address same
286 * groupids for multiple topics. Same groupid with multiple topics
287 * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
289 private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
290 final Properties props = new Properties();
291 //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
292 //Fix for CPFMF-644 :
293 final String fakeGroupName = groupId + "--" + topic;
294 props.put("group.id", fakeGroupName);
295 props.put("enable.auto.commit", "false"); // 0.11
296 props.put("bootstrap.servers", fkafkaBrokers);
297 /*props.put("sasl.jaas.config",
298 "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
299 props.put("security.protocol", "SASL_PLAINTEXT");
300 props.put("sasl.mechanism", "PLAIN");*/
301 props.put("client.id", consumerId);
303 // additional settings: start with our defaults, then pull in configured
305 populateKafkaInternalDefaultsMap();
306 for (String key : KafkaConsumerKeys) {
307 transferSettingIfProvided(props, key, "kafka");
310 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
311 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
317 private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
318 "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
319 "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
320 "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
321 "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
322 "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
325 * putting values in hashmap like consumer timeout, zookeeper time out, etc
329 private static void populateKafkaInternalDefaultsMap() { }
332 * The starterIncremnt value is just to emulate calling certain consumers,
333 * in this test app all the consumers are local
336 private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
338 return new LiveLockAvoidance() {
341 public String getAppId() {
346 public void handleRebalanceUnlock(String groupName) {
347 log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
348 Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
355 @SuppressWarnings("rawtypes")
357 public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
358 String remotehost) throws UnavailableException, CambriaApiException {
359 // TODO Auto-generated method stub
363 private HashMap<String, Object> synchash = new HashMap<String, Object>();