sonar critical for Conditional Statement
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / beans / DMaaPKafkaConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.beans;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Properties;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.curator.framework.CuratorFramework;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 //import org.slf4j.Logger;
35 //import org.slf4j.LoggerFactory;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.springframework.beans.factory.annotation.Qualifier;
39
40 import com.att.nsa.cambria.backends.Consumer;
41 import com.att.nsa.cambria.backends.ConsumerFactory;
42 import com.att.nsa.cambria.backends.MetricsSet;
43 import com.att.nsa.cambria.backends.kafka.KafkaConsumer;
44 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache;
45 import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
46 import com.att.nsa.cambria.constants.CambriaConstants;
47 import com.att.nsa.cambria.utils.ConfigurationReader;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
50 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
51 import kafka.consumer.ConsumerConfig;
52 import kafka.javaapi.consumer.ConsumerConnector;
53
54 /**
55  * @author author
56  *
57  */
58 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
59
60         //private static final Logger log = LoggerFactory                       .getLogger(DMaaPKafkaConsumerFactory.class);
61         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
62         /**
63          * constructor initialization
64          * 
65          * @param settings
66          * @param metrics
67          * @param curator
68          * @throws missingReqdSetting
69          * @throws KafkaConsumerCacheException
70          * @throws UnknownHostException
71          */
72         public DMaaPKafkaConsumerFactory(
73                         @Qualifier("propertyReader") rrNvReadable settings,
74                         @Qualifier("dMaaPMetricsSet") MetricsSet metrics,
75                         @Qualifier("curator") CuratorFramework curator)
76                         throws missingReqdSetting, KafkaConsumerCacheException,
77                         UnknownHostException {
78                 /*final String apiNodeId = settings.getString(
79                                 CambriaConstants.kSetting_ApiNodeIdentifier,
80                                 InetAddress.getLocalHost().getCanonicalHostName()
81                                                 + ":"
82                                                 + settings.getInt(CambriaConstants.kSetting_Port,
83                                                                 CambriaConstants.kDefault_Port));*/
84                  String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
85                 CambriaConstants.kSetting_ApiNodeIdentifier);
86                 if (apiNodeId == null){
87                         
88                         apiNodeId=InetAddress.getLocalHost().getCanonicalHostName()
89                         + ":"
90                         + settings.getInt(CambriaConstants.kSetting_Port,
91                                         CambriaConstants.kDefault_Port);
92                 }
93                 
94                 log.info("This Cambria API Node identifies itself as [" + apiNodeId
95                                 + "].");
96                 final String mode = CambriaConstants.DMAAP;
97                 /*fSettings = settings;
98                 fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings
99                                 .getString(CambriaConstants.kSetting_ZkConfigDbServers,
100                                                 CambriaConstants.kDefault_ZkConfigDbServers));*/
101
102                 String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
103                 if(null==strkSettings_KafkaZookeeper){
104                          strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
105                         if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
106                         
107                 }
108                 fZooKeeper=  strkSettings_KafkaZookeeper;
109                 
110                 //final boolean isCacheEnabled = fSettings.getBoolean(
111                         //      kSetting_EnableCache, kDefault_IsCacheEnabled);
112                 boolean kSetting_EnableCache= kDefault_IsCacheEnabled;
113                 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+"");
114                 if(null!=strkSetting_EnableCache)
115                         {
116                                 kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
117                         }
118                         
119                                 final boolean isCacheEnabled = kSetting_EnableCache;
120                                 
121                                 
122                 fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, 
123                                 metrics) : null;
124                 if (fCache != null) {
125                         fCache.startCache(mode, curator);
126                 }
127         }
128
129         @Override
130         public Consumer getConsumerFor(String topic, String consumerGroupName,
131                         String consumerId, int timeoutMs) throws UnavailableException {
132                 KafkaConsumer kc;
133
134                 try {
135                         kc = (fCache != null) ? fCache.getConsumerFor(topic,
136                                         consumerGroupName, consumerId) : null;
137                 } catch (KafkaConsumerCacheException e) {
138                         throw new UnavailableException(e);
139                 }
140
141                 if (kc == null) {
142                         
143                         final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
144 //                      final InterProcessMutex fLock = new InterProcessMutex(
145 //                                      ConfigurationReader.getCurator(), "/consumerFactory/"
146 //                                                      + topic + "/" + consumerGroupName + "/"
147 //                                                      + consumerId);
148                         boolean locked = false;
149                         try {
150                         
151                                 locked = ipLock.acquire(30, TimeUnit.SECONDS);
152                                 if (!locked) {
153                                         // FIXME: this seems to cause trouble in some cases. This exception
154                                         // gets thrown routinely. Possibly a consumer trying multiple servers
155                                         // at once, producing a never-ending cycle of overlapping locks?
156                                         // The problem is that it throws and winds up sending a 503 to the
157                                         // client, which would be incorrect if the client is causing trouble
158                                         // by switching back and forth.
159                                         
160                                         throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")");
161                                 }
162                                 
163 //                              if (!fLock.acquire(30, TimeUnit.SECONDS)) {
164 //                                      throw new UnavailableException(
165 //                                                      "Could not acquire lock in order to create (topic, group, consumer) = "
166 //                                                                      + "(" + topic + ", " + consumerGroupName
167 //                                                                      + ", " + consumerId + ")");
168 //                              }
169
170                                 fCache.signalOwnership(topic, consumerGroupName, consumerId);
171
172                                 log.info("Creating Kafka consumer for group ["
173                                                 + consumerGroupName + "], consumer [" + consumerId
174                                                 + "], on topic [" + topic + "].");
175
176                                 final String fakeGroupName = consumerGroupName + "--" + topic;
177
178                                 final ConsumerConfig ccc = createConsumerConfig(fakeGroupName,
179                                                 consumerId);
180                                 final ConsumerConnector cc = kafka.consumer.Consumer
181                                                 .createJavaConsumerConnector(ccc);
182                                 kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc);
183
184                                 if (fCache != null) {
185                                         fCache.putConsumerFor(topic, consumerGroupName, consumerId,
186                                                         kc);
187                                 }
188                         } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
189                                 log.error("Exception find at getConsumerFor(String topic, String consumerGroupName,\r\n" + 
190                                                 "                       String consumerId, int timeoutMs) : " + x);
191                                 throw new UnavailableException("Couldn't connect to ZK.");
192                         } catch (KafkaConsumerCacheException e) {
193                                 log.error("Failed to cache consumer (this may have performance implications): "
194                                                 + e.getMessage());
195                         } catch (Exception e) {
196                                 throw new UnavailableException(
197                                                 "Error while acquiring consumer factory lock", e);
198                         } finally {
199                                 if ( locked )
200                                 {
201                                         try {
202                                                 ipLock.release();
203                                         } catch (Exception e) {
204                                                 throw new UnavailableException("Error while releasing consumer factory lock", e);
205                                         }
206                                 }       
207                         }
208                 }
209
210                 return kc;
211         }
212
213         @Override
214         public synchronized void destroyConsumer(String topic,
215                         String consumerGroup, String clientId) {
216                 if (fCache != null) {
217                         fCache.dropConsumer(topic, consumerGroup, clientId);
218                 }
219         }
220
221         @Override
222         public synchronized Collection<? extends Consumer> getConsumers() {
223                 return fCache.getConsumers();
224         }
225
226         @Override
227         public synchronized void dropCache() {
228                 fCache.dropAllConsumers();
229         }
230
231         private ConsumerConfig createConsumerConfig(String groupId,
232                         String consumerId) {
233                 final Properties props = new Properties();
234                 props.put("zookeeper.connect", fZooKeeper);
235                 props.put("group.id", groupId);
236                 props.put("consumer.id", consumerId);
237                 //props.put("auto.commit.enable", "false");
238                 // additional settings: start with our defaults, then pull in configured
239                 // overrides
240                 props.putAll(KafkaInternalDefaults);
241                 for (String key : KafkaConsumerKeys) {
242                         transferSettingIfProvided(props, key, "kafka");
243                 }
244
245                 return new ConsumerConfig(props);
246         }
247
248         //private final rrNvReadable fSettings;
249         private final KafkaConsumerCache fCache;
250
251         private String fZooKeeper;
252
253         private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper";
254
255         private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>();
256
257         /**
258          * putting values in hashmap like consumer timeout, zookeeper time out, etc
259          * 
260          * @param setting
261          */
262         public static void populateKafkaInternalDefaultsMap() {
263                         //@Qualifier("propertyReader") rrNvReadable setting) {
264                 try {
265                         
266                         HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
267                                         
268                         KafkaInternalDefaults.put("consumer.timeout.ms",
269                                                         //      AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms"));
270                         map1.get( "consumer.timeout.ms"));
271                         
272                         KafkaInternalDefaults.put("zookeeper.connection.timeout.ms",
273                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms"));
274                                         map1.get("zookeeper.connection.timeout.ms"));
275                         KafkaInternalDefaults.put("zookeeper.session.timeout.ms",
276                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms"));
277                         map1.get("zookeeper.session.timeout.ms"));
278                         KafkaInternalDefaults.put("zookeeper.sync.time.ms",
279                                 //      AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms"));
280                         map1.get( "zookeeper.sync.time.ms"));
281                         KafkaInternalDefaults.put("auto.commit.interval.ms",
282                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms"));
283                         map1.get( "auto.commit.interval.ms"));
284                         KafkaInternalDefaults.put("fetch.message.max.bytes",
285                                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes"));
286                         map1.get("fetch.message.max.bytes"));
287                         KafkaInternalDefaults.put("auto.commit.enable",
288                         //              AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable"));
289                         map1.get("auto.commit.enable"));
290                 } catch (Exception e) {
291                         log.error("Failed to load Kafka Internal Properties.", e);
292                 }
293         }
294
295         private static final String KafkaConsumerKeys[] = { "socket.timeout.ms",
296                         "socket.receive.buffer.bytes", "fetch.message.max.bytes",
297                         "auto.commit.interval.ms", "queued.max.message.chunks",
298                         "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes",
299                         "rebalance.backoff.ms", "refresh.leader.backoff.ms",
300                         "auto.offset.reset", "consumer.timeout.ms",
301                         "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms",
302                         "zookeeper.sync.time.ms" };
303
304         private static String makeLongKey(String key, String prefix) {
305                 return prefix + "." + key;
306         }
307
308         private void transferSettingIfProvided(Properties target, String key,
309                         String prefix) {
310                 String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix));
311         
312         //      if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
313                 if (null!=keyVal) {
314         //              final String val = fSettings
315                 //                      .getString(makeLongKey(key, prefix), "");
316                         log.info("Setting [" + key + "] to " + keyVal + ".");
317                         target.put(key, keyVal);
318                 }
319         }
320
321         }
322
323