1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.beans;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Properties;
30 import java.util.concurrent.TimeUnit;
32 import org.apache.curator.framework.CuratorFramework;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 //import org.slf4j.Logger;
35 //import org.slf4j.LoggerFactory;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.springframework.beans.factory.annotation.Qualifier;
40 import com.att.nsa.cambria.backends.Consumer;
41 import com.att.nsa.cambria.backends.ConsumerFactory;
42 import com.att.nsa.cambria.backends.MetricsSet;
43 import com.att.nsa.cambria.backends.kafka.KafkaConsumer;
44 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache;
45 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
46 import com.att.nsa.cambria.constants.CambriaConstants;
47 import com.att.nsa.cambria.utils.ConfigurationReader;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
50 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
51 import kafka.consumer.ConsumerConfig;
52 import kafka.javaapi.consumer.ConsumerConnector;
58 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
60 //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class);
61 private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
63 * constructor initialization
68 * @throws missingReqdSetting
69 * @throws KafkaConsumerCacheException
70 * @throws UnknownHostException
72 public DMaaPKafkaConsumerFactory(
73 @Qualifier("propertyReader") rrNvReadable settings,
74 @Qualifier("dMaaPMetricsSet") MetricsSet metrics,
75 @Qualifier("curator") CuratorFramework curator)
76 throws missingReqdSetting, KafkaConsumerCacheException,
77 UnknownHostException {
78 /*final String apiNodeId = settings.getString(
79 CambriaConstants.kSetting_ApiNodeIdentifier,
80 InetAddress.getLocalHost().getCanonicalHostName()
82 + settings.getInt(CambriaConstants.kSetting_Port,
83 CambriaConstants.kDefault_Port));*/
84 String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
85 CambriaConstants.kSetting_ApiNodeIdentifier);
86 if (apiNodeId == null){
88 apiNodeId=InetAddress.getLocalHost().getCanonicalHostName()
90 + settings.getInt(CambriaConstants.kSetting_Port,
91 CambriaConstants.kDefault_Port);
94 log.info("This Cambria API Node identifies itself as [" + apiNodeId
96 final String mode = CambriaConstants.DMAAP;
97 /*fSettings = settings;
98 fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings
99 .getString(CambriaConstants.kSetting_ZkConfigDbServers,
100 CambriaConstants.kDefault_ZkConfigDbServers));*/
102 String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
103 if(null==strkSettings_KafkaZookeeper){
104 strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
105 if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
108 fZooKeeper= strkSettings_KafkaZookeeper;
110 //final boolean isCacheEnabled = fSettings.getBoolean(
111 // kSetting_EnableCache, kDefault_IsCacheEnabled);
112 boolean kSetting_EnableCache= kDefault_IsCacheEnabled;
113 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+"");
114 if(null!=strkSetting_EnableCache)kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
116 final boolean isCacheEnabled = kSetting_EnableCache;
119 fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
121 if (fCache != null) {
122 fCache.startCache(mode, curator);
127 public Consumer getConsumerFor(String topic, String consumerGroupName,
128 String consumerId, int timeoutMs) throws UnavailableException {
132 kc = (fCache != null) ? fCache.getConsumerFor(topic,
133 consumerGroupName, consumerId) : null;
134 } catch (KafkaConsumerCacheException e) {
135 throw new UnavailableException(e);
140 final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
141 // final InterProcessMutex fLock = new InterProcessMutex(
142 // ConfigurationReader.getCurator(), "/consumerFactory/"
143 // + topic + "/" + consumerGroupName + "/"
145 boolean locked = false;
148 locked = ipLock.acquire(30, TimeUnit.SECONDS);
150 // FIXME: this seems to cause trouble in some cases. This exception
151 // gets thrown routinely. Possibly a consumer trying multiple servers
152 // at once, producing a never-ending cycle of overlapping locks?
153 // The problem is that it throws and winds up sending a 503 to the
154 // client, which would be incorrect if the client is causing trouble
155 // by switching back and forth.
157 throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")");
160 // if (!fLock.acquire(30, TimeUnit.SECONDS)) {
161 // throw new UnavailableException(
162 // "Could not acquire lock in order to create (topic, group, consumer) = "
163 // + "(" + topic + ", " + consumerGroupName
164 // + ", " + consumerId + ")");
167 fCache.signalOwnership(topic, consumerGroupName, consumerId);
169 log.info("Creating Kafka consumer for group ["
170 + consumerGroupName + "], consumer [" + consumerId
171 + "], on topic [" + topic + "].");
173 final String fakeGroupName = consumerGroupName + "--" + topic;
175 final ConsumerConfig ccc = createConsumerConfig(fakeGroupName,
177 final ConsumerConnector cc = kafka.consumer.Consumer
178 .createJavaConsumerConnector(ccc);
179 kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc);
181 if (fCache != null) {
182 fCache.putConsumerFor(topic, consumerGroupName, consumerId,
185 } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
186 log.warn("Kafka consumer couldn't connect to ZK.");
187 throw new UnavailableException("Couldn't connect to ZK.");
188 } catch (KafkaConsumerCacheException e) {
189 log.warn("Failed to cache consumer (this may have performance implications): "
191 } catch (Exception e) {
192 throw new UnavailableException(
193 "Error while acquiring consumer factory lock", e);
199 } catch (Exception e) {
200 throw new UnavailableException("Error while releasing consumer factory lock", e);
210 public synchronized void destroyConsumer(String topic,
211 String consumerGroup, String clientId) {
212 if (fCache != null) {
213 fCache.dropConsumer(topic, consumerGroup, clientId);
218 public synchronized Collection<? extends Consumer> getConsumers() {
219 return fCache.getConsumers();
223 public synchronized void dropCache() {
224 fCache.dropAllConsumers();
227 private ConsumerConfig createConsumerConfig(String groupId,
229 final Properties props = new Properties();
230 props.put("zookeeper.connect", fZooKeeper);
231 props.put("group.id", groupId);
232 props.put("consumer.id", consumerId);
233 //props.put("auto.commit.enable", "false");
234 // additional settings: start with our defaults, then pull in configured
236 props.putAll(KafkaInternalDefaults);
237 for (String key : KafkaConsumerKeys) {
238 transferSettingIfProvided(props, key, "kafka");
241 return new ConsumerConfig(props);
244 //private final rrNvReadable fSettings;
245 private final KafkaConsumerCache fCache;
247 private String fZooKeeper;
249 private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper";
251 private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>();
254 * putting values in hashmap like consumer timeout, zookeeper time out, etc
258 public static void populateKafkaInternalDefaultsMap() {
259 //@Qualifier("propertyReader") rrNvReadable setting) {
262 HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
264 KafkaInternalDefaults.put("consumer.timeout.ms",
265 // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms"));
266 map1.get( "consumer.timeout.ms"));
268 KafkaInternalDefaults.put("zookeeper.connection.timeout.ms",
269 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms"));
270 map1.get("zookeeper.connection.timeout.ms"));
271 KafkaInternalDefaults.put("zookeeper.session.timeout.ms",
272 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms"));
273 map1.get("zookeeper.session.timeout.ms"));
274 KafkaInternalDefaults.put("zookeeper.sync.time.ms",
275 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms"));
276 map1.get( "zookeeper.sync.time.ms"));
277 KafkaInternalDefaults.put("auto.commit.interval.ms",
278 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms"));
279 map1.get( "auto.commit.interval.ms"));
280 KafkaInternalDefaults.put("fetch.message.max.bytes",
281 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes"));
282 map1.get("fetch.message.max.bytes"));
283 KafkaInternalDefaults.put("auto.commit.enable",
284 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable"));
285 map1.get("auto.commit.enable"));
286 } catch (Exception e) {
287 log.error("Failed to load Kafka Internal Properties.", e);
291 private static final String KafkaConsumerKeys[] = { "socket.timeout.ms",
292 "socket.receive.buffer.bytes", "fetch.message.max.bytes",
293 "auto.commit.interval.ms", "queued.max.message.chunks",
294 "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes",
295 "rebalance.backoff.ms", "refresh.leader.backoff.ms",
296 "auto.offset.reset", "consumer.timeout.ms",
297 "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms",
298 "zookeeper.sync.time.ms" };
300 private static String makeLongKey(String key, String prefix) {
301 return prefix + "." + key;
304 private void transferSettingIfProvided(Properties target, String key,
306 String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix));
308 // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
310 // final String val = fSettings
311 // .getString(makeLongKey(key, prefix), "");
312 log.info("Setting [" + key + "] to " + keyVal + ".");
313 target.put(key, keyVal);