Add Initial Code Import
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / beans / DMaaPKafkaConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.beans;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Properties;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.curator.framework.CuratorFramework;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 //import org.slf4j.Logger;
35 //import org.slf4j.LoggerFactory;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.springframework.beans.factory.annotation.Qualifier;
39
40 import com.att.nsa.cambria.backends.Consumer;
41 import com.att.nsa.cambria.backends.ConsumerFactory;
42 import com.att.nsa.cambria.backends.MetricsSet;
43 import com.att.nsa.cambria.backends.kafka.KafkaConsumer;
44 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache;
45 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
46 import com.att.nsa.cambria.constants.CambriaConstants;
47 import com.att.nsa.cambria.utils.ConfigurationReader;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
50 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
51 import kafka.consumer.ConsumerConfig;
52 import kafka.javaapi.consumer.ConsumerConnector;
53
54 /**
55  * @author author
56  *
57  */
58 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
59
60         //private static final Logger log = LoggerFactory                       .getLogger(DMaaPKafkaConsumerFactory.class);
61         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
62         /**
63          * constructor initialization
64          * 
65          * @param settings
66          * @param metrics
67          * @param curator
68          * @throws missingReqdSetting
69          * @throws KafkaConsumerCacheException
70          * @throws UnknownHostException
71          */
72         public DMaaPKafkaConsumerFactory(
73                         @Qualifier("propertyReader") rrNvReadable settings,
74                         @Qualifier("dMaaPMetricsSet") MetricsSet metrics,
75                         @Qualifier("curator") CuratorFramework curator)
76                         throws missingReqdSetting, KafkaConsumerCacheException,
77                         UnknownHostException {
78                 /*final String apiNodeId = settings.getString(
79                                 CambriaConstants.kSetting_ApiNodeIdentifier,
80                                 InetAddress.getLocalHost().getCanonicalHostName()
81                                                 + ":"
82                                                 + settings.getInt(CambriaConstants.kSetting_Port,
83                                                                 CambriaConstants.kDefault_Port));*/
84                  String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
85                 CambriaConstants.kSetting_ApiNodeIdentifier);
86                 if (apiNodeId == null){
87                         
88                         apiNodeId=InetAddress.getLocalHost().getCanonicalHostName()
89                         + ":"
90                         + settings.getInt(CambriaConstants.kSetting_Port,
91                                         CambriaConstants.kDefault_Port);
92                 }
93                 
94                 log.info("This Cambria API Node identifies itself as [" + apiNodeId
95                                 + "].");
96                 final String mode = CambriaConstants.DMAAP;
97                 /*fSettings = settings;
98                 fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings
99                                 .getString(CambriaConstants.kSetting_ZkConfigDbServers,
100                                                 CambriaConstants.kDefault_ZkConfigDbServers));*/
101
102                 String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
103                 if(null==strkSettings_KafkaZookeeper){
104                          strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
105                         if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
106                         
107                 }
108                 fZooKeeper=  strkSettings_KafkaZookeeper;
109                 
110                 //final boolean isCacheEnabled = fSettings.getBoolean(
111                         //      kSetting_EnableCache, kDefault_IsCacheEnabled);
112                 boolean kSetting_EnableCache= kDefault_IsCacheEnabled;
113                 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+"");
114                 if(null!=strkSetting_EnableCache)kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
115                         
116                                 final boolean isCacheEnabled = kSetting_EnableCache;
117                                 
118                                 
119                 fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, 
120                                 metrics) : null;
121                 if (fCache != null) {
122                         fCache.startCache(mode, curator);
123                 }
124         }
125
126         @Override
127         public Consumer getConsumerFor(String topic, String consumerGroupName,
128                         String consumerId, int timeoutMs) throws UnavailableException {
129                 KafkaConsumer kc;
130
131                 try {
132                         kc = (fCache != null) ? fCache.getConsumerFor(topic,
133                                         consumerGroupName, consumerId) : null;
134                 } catch (KafkaConsumerCacheException e) {
135                         throw new UnavailableException(e);
136                 }
137
138                 if (kc == null) {
139                         
140                         final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
141 //                      final InterProcessMutex fLock = new InterProcessMutex(
142 //                                      ConfigurationReader.getCurator(), "/consumerFactory/"
143 //                                                      + topic + "/" + consumerGroupName + "/"
144 //                                                      + consumerId);
145                         boolean locked = false;
146                         try {
147                         
148                                 locked = ipLock.acquire(30, TimeUnit.SECONDS);
149                                 if (!locked) {
150                                         // FIXME: this seems to cause trouble in some cases. This exception
151                                         // gets thrown routinely. Possibly a consumer trying multiple servers
152                                         // at once, producing a never-ending cycle of overlapping locks?
153                                         // The problem is that it throws and winds up sending a 503 to the
154                                         // client, which would be incorrect if the client is causing trouble
155                                         // by switching back and forth.
156                                         
157                                         throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")");
158                                 }
159                                 
160 //                              if (!fLock.acquire(30, TimeUnit.SECONDS)) {
161 //                                      throw new UnavailableException(
162 //                                                      "Could not acquire lock in order to create (topic, group, consumer) = "
163 //                                                                      + "(" + topic + ", " + consumerGroupName
164 //                                                                      + ", " + consumerId + ")");
165 //                              }
166
167                                 fCache.signalOwnership(topic, consumerGroupName, consumerId);
168
169                                 log.info("Creating Kafka consumer for group ["
170                                                 + consumerGroupName + "], consumer [" + consumerId
171                                                 + "], on topic [" + topic + "].");
172
173                                 final String fakeGroupName = consumerGroupName + "--" + topic;
174
175                                 final ConsumerConfig ccc = createConsumerConfig(fakeGroupName,
176                                                 consumerId);
177                                 final ConsumerConnector cc = kafka.consumer.Consumer
178                                                 .createJavaConsumerConnector(ccc);
179                                 kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc);
180
181                                 if (fCache != null) {
182                                         fCache.putConsumerFor(topic, consumerGroupName, consumerId,
183                                                         kc);
184                                 }
185                         } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
186                                 log.warn("Kafka consumer couldn't connect to ZK.");
187                                 throw new UnavailableException("Couldn't connect to ZK.");
188                         } catch (KafkaConsumerCacheException e) {
189                                 log.warn("Failed to cache consumer (this may have performance implications): "
190                                                 + e.getMessage());
191                         } catch (Exception e) {
192                                 throw new UnavailableException(
193                                                 "Error while acquiring consumer factory lock", e);
194                         } finally {
195                                 if ( locked )
196                                 {
197                                         try {
198                                                 ipLock.release();
199                                         } catch (Exception e) {
200                                                 throw new UnavailableException("Error while releasing consumer factory lock", e);
201                                         }
202                                 }       
203                         }
204                 }
205
206                 return kc;
207         }
208
209         @Override
210         public synchronized void destroyConsumer(String topic,
211                         String consumerGroup, String clientId) {
212                 if (fCache != null) {
213                         fCache.dropConsumer(topic, consumerGroup, clientId);
214                 }
215         }
216
217         @Override
218         public synchronized Collection<? extends Consumer> getConsumers() {
219                 return fCache.getConsumers();
220         }
221
222         @Override
223         public synchronized void dropCache() {
224                 fCache.dropAllConsumers();
225         }
226
227         private ConsumerConfig createConsumerConfig(String groupId,
228                         String consumerId) {
229                 final Properties props = new Properties();
230                 props.put("zookeeper.connect", fZooKeeper);
231                 props.put("group.id", groupId);
232                 props.put("consumer.id", consumerId);
233                 //props.put("auto.commit.enable", "false");
234                 // additional settings: start with our defaults, then pull in configured
235                 // overrides
236                 props.putAll(KafkaInternalDefaults);
237                 for (String key : KafkaConsumerKeys) {
238                         transferSettingIfProvided(props, key, "kafka");
239                 }
240
241                 return new ConsumerConfig(props);
242         }
243
244         //private final rrNvReadable fSettings;
245         private final KafkaConsumerCache fCache;
246
247         private String fZooKeeper;
248
249         private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper";
250
251         private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>();
252
253         /**
254          * putting values in hashmap like consumer timeout, zookeeper time out, etc
255          * 
256          * @param setting
257          */
258         public static void populateKafkaInternalDefaultsMap() {
259                         //@Qualifier("propertyReader") rrNvReadable setting) {
260                 try {
261                         
262                         HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
263                                         
264                         KafkaInternalDefaults.put("consumer.timeout.ms",
265                                                         //      AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms"));
266                         map1.get( "consumer.timeout.ms"));
267                         
268                         KafkaInternalDefaults.put("zookeeper.connection.timeout.ms",
269                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms"));
270                                         map1.get("zookeeper.connection.timeout.ms"));
271                         KafkaInternalDefaults.put("zookeeper.session.timeout.ms",
272                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms"));
273                         map1.get("zookeeper.session.timeout.ms"));
274                         KafkaInternalDefaults.put("zookeeper.sync.time.ms",
275                                 //      AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms"));
276                         map1.get( "zookeeper.sync.time.ms"));
277                         KafkaInternalDefaults.put("auto.commit.interval.ms",
278                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms"));
279                         map1.get( "auto.commit.interval.ms"));
280                         KafkaInternalDefaults.put("fetch.message.max.bytes",
281                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes"));
282                         map1.get("fetch.message.max.bytes"));
283                         KafkaInternalDefaults.put("auto.commit.enable",
284                         //              AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable"));
285                         map1.get("auto.commit.enable"));
286                 } catch (Exception e) {
287                         log.error("Failed to load Kafka Internal Properties.", e);
288                 }
289         }
290
291         private static final String KafkaConsumerKeys[] = { "socket.timeout.ms",
292                         "socket.receive.buffer.bytes", "fetch.message.max.bytes",
293                         "auto.commit.interval.ms", "queued.max.message.chunks",
294                         "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes",
295                         "rebalance.backoff.ms", "refresh.leader.backoff.ms",
296                         "auto.offset.reset", "consumer.timeout.ms",
297                         "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms",
298                         "zookeeper.sync.time.ms" };
299
300         private static String makeLongKey(String key, String prefix) {
301                 return prefix + "." + key;
302         }
303
304         private void transferSettingIfProvided(Properties target, String key,
305                         String prefix) {
306                 String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix));
307         
308         //      if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
309                 if (null!=keyVal) {
310         //              final String val = fSettings
311                 //                      .getString(makeLongKey(key, prefix), "");
312                         log.info("Setting [" + key + "] to " + keyVal + ".");
313                         target.put(key, keyVal);
314                 }
315         }
316
317         }
318
319