1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.beans;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Properties;
30 import java.util.concurrent.TimeUnit;
32 import org.apache.curator.framework.CuratorFramework;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 //import org.slf4j.Logger;
35 //import org.slf4j.LoggerFactory;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.springframework.beans.factory.annotation.Qualifier;
40 import com.att.nsa.cambria.backends.Consumer;
41 import com.att.nsa.cambria.backends.ConsumerFactory;
42 import com.att.nsa.cambria.backends.MetricsSet;
43 import com.att.nsa.cambria.backends.kafka.KafkaConsumer;
44 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache;
45 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
46 import com.att.nsa.cambria.constants.CambriaConstants;
47 import com.att.nsa.cambria.utils.ConfigurationReader;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
50 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
51 import kafka.consumer.ConsumerConfig;
52 import kafka.javaapi.consumer.ConsumerConnector;
58 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
60 //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class);
61 private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
63 * constructor initialization
68 * @throws missingReqdSetting
69 * @throws KafkaConsumerCacheException
70 * @throws UnknownHostException
72 public DMaaPKafkaConsumerFactory(
73 @Qualifier("propertyReader") rrNvReadable settings,
74 @Qualifier("dMaaPMetricsSet") MetricsSet metrics,
75 @Qualifier("curator") CuratorFramework curator)
76 throws missingReqdSetting, KafkaConsumerCacheException,
77 UnknownHostException {
78 /*final String apiNodeId = settings.getString(
79 CambriaConstants.kSetting_ApiNodeIdentifier,
80 InetAddress.getLocalHost().getCanonicalHostName()
82 + settings.getInt(CambriaConstants.kSetting_Port,
83 CambriaConstants.kDefault_Port));*/
84 String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
85 CambriaConstants.kSetting_ApiNodeIdentifier);
86 if (apiNodeId == null){
88 apiNodeId=InetAddress.getLocalHost().getCanonicalHostName()
90 + settings.getInt(CambriaConstants.kSetting_Port,
91 CambriaConstants.kDefault_Port);
94 log.info("This Cambria API Node identifies itself as [" + apiNodeId
96 final String mode = CambriaConstants.DMAAP;
97 /*fSettings = settings;
98 fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings
99 .getString(CambriaConstants.kSetting_ZkConfigDbServers,
100 CambriaConstants.kDefault_ZkConfigDbServers));*/
102 String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
103 if(null==strkSettings_KafkaZookeeper){
104 strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
105 if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
108 fZooKeeper= strkSettings_KafkaZookeeper;
110 //final boolean isCacheEnabled = fSettings.getBoolean(
111 // kSetting_EnableCache, kDefault_IsCacheEnabled);
112 boolean kSetting_EnableCache= kDefault_IsCacheEnabled;
113 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+"");
114 if(null!=strkSetting_EnableCache)
116 kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
119 final boolean isCacheEnabled = kSetting_EnableCache;
122 fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
124 if (fCache != null) {
125 fCache.startCache(mode, curator);
130 public Consumer getConsumerFor(String topic, String consumerGroupName,
131 String consumerId, int timeoutMs) throws UnavailableException {
135 kc = (fCache != null) ? fCache.getConsumerFor(topic,
136 consumerGroupName, consumerId) : null;
137 } catch (KafkaConsumerCacheException e) {
138 throw new UnavailableException(e);
143 final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
144 // final InterProcessMutex fLock = new InterProcessMutex(
145 // ConfigurationReader.getCurator(), "/consumerFactory/"
146 // + topic + "/" + consumerGroupName + "/"
148 boolean locked = false;
151 locked = ipLock.acquire(30, TimeUnit.SECONDS);
153 // FIXME: this seems to cause trouble in some cases. This exception
154 // gets thrown routinely. Possibly a consumer trying multiple servers
155 // at once, producing a never-ending cycle of overlapping locks?
156 // The problem is that it throws and winds up sending a 503 to the
157 // client, which would be incorrect if the client is causing trouble
158 // by switching back and forth.
160 throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")");
163 // if (!fLock.acquire(30, TimeUnit.SECONDS)) {
164 // throw new UnavailableException(
165 // "Could not acquire lock in order to create (topic, group, consumer) = "
166 // + "(" + topic + ", " + consumerGroupName
167 // + ", " + consumerId + ")");
170 fCache.signalOwnership(topic, consumerGroupName, consumerId);
172 log.info("Creating Kafka consumer for group ["
173 + consumerGroupName + "], consumer [" + consumerId
174 + "], on topic [" + topic + "].");
176 final String fakeGroupName = consumerGroupName + "--" + topic;
178 final ConsumerConfig ccc = createConsumerConfig(fakeGroupName,
180 final ConsumerConnector cc = kafka.consumer.Consumer
181 .createJavaConsumerConnector(ccc);
182 kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc);
184 if (fCache != null) {
185 fCache.putConsumerFor(topic, consumerGroupName, consumerId,
188 } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
189 log.error("Exception find at getConsumerFor(String topic, String consumerGroupName,\r\n" +
190 " String consumerId, int timeoutMs) : " + x);
191 throw new UnavailableException("Couldn't connect to ZK.");
192 } catch (KafkaConsumerCacheException e) {
193 log.error("Failed to cache consumer (this may have performance implications): "
195 } catch (Exception e) {
196 throw new UnavailableException(
197 "Error while acquiring consumer factory lock", e);
203 } catch (Exception e) {
204 throw new UnavailableException("Error while releasing consumer factory lock", e);
214 public synchronized void destroyConsumer(String topic,
215 String consumerGroup, String clientId) {
216 if (fCache != null) {
217 fCache.dropConsumer(topic, consumerGroup, clientId);
222 public synchronized Collection<? extends Consumer> getConsumers() {
223 return fCache.getConsumers();
227 public synchronized void dropCache() {
228 fCache.dropAllConsumers();
231 private ConsumerConfig createConsumerConfig(String groupId,
233 final Properties props = new Properties();
234 props.put("zookeeper.connect", fZooKeeper);
235 props.put("group.id", groupId);
236 props.put("consumer.id", consumerId);
237 //props.put("auto.commit.enable", "false");
238 // additional settings: start with our defaults, then pull in configured
240 props.putAll(KafkaInternalDefaults);
241 for (String key : KafkaConsumerKeys) {
242 transferSettingIfProvided(props, key, "kafka");
245 return new ConsumerConfig(props);
248 //private final rrNvReadable fSettings;
249 private final KafkaConsumerCache fCache;
251 private String fZooKeeper;
253 private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper";
255 private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>();
258 * putting values in hashmap like consumer timeout, zookeeper time out, etc
262 public static void populateKafkaInternalDefaultsMap() {
263 //@Qualifier("propertyReader") rrNvReadable setting) {
266 HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
268 KafkaInternalDefaults.put("consumer.timeout.ms",
269 // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms"));
270 map1.get( "consumer.timeout.ms"));
272 KafkaInternalDefaults.put("zookeeper.connection.timeout.ms",
273 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms"));
274 map1.get("zookeeper.connection.timeout.ms"));
275 KafkaInternalDefaults.put("zookeeper.session.timeout.ms",
276 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms"));
277 map1.get("zookeeper.session.timeout.ms"));
278 KafkaInternalDefaults.put("zookeeper.sync.time.ms",
279 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms"));
280 map1.get( "zookeeper.sync.time.ms"));
281 KafkaInternalDefaults.put("auto.commit.interval.ms",
282 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms"));
283 map1.get( "auto.commit.interval.ms"));
284 KafkaInternalDefaults.put("fetch.message.max.bytes",
285 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes"));
286 map1.get("fetch.message.max.bytes"));
287 KafkaInternalDefaults.put("auto.commit.enable",
288 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable"));
289 map1.get("auto.commit.enable"));
290 } catch (Exception e) {
291 log.error("Failed to load Kafka Internal Properties.", e);
295 private static final String KafkaConsumerKeys[] = { "socket.timeout.ms",
296 "socket.receive.buffer.bytes", "fetch.message.max.bytes",
297 "auto.commit.interval.ms", "queued.max.message.chunks",
298 "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes",
299 "rebalance.backoff.ms", "refresh.leader.backoff.ms",
300 "auto.offset.reset", "consumer.timeout.ms",
301 "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms",
302 "zookeeper.sync.time.ms" };
304 private static String makeLongKey(String key, String prefix) {
305 return prefix + "." + key;
308 private void transferSettingIfProvided(Properties target, String key,
310 String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix));
312 // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
314 // final String val = fSettings
315 // .getString(makeLongKey(key, prefix), "");
316 log.info("Setting [" + key + "] to " + keyVal + ".");
317 target.put(key, keyVal);