update the testcases after the kafka 11 changes
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPKafkaConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.beans;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Properties;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
33 import org.apache.kafka.clients.consumer.KafkaConsumer;
34 import org.springframework.beans.factory.annotation.Qualifier;
35
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import com.att.dmf.mr.CambriaApiException;
38 import com.att.dmf.mr.backends.Consumer;
39 import com.att.dmf.mr.backends.ConsumerFactory;
40 import com.att.dmf.mr.backends.MetricsSet;
41 import com.att.dmf.mr.backends.kafka.Kafka011Consumer;
42 import com.att.dmf.mr.backends.kafka.Kafka011ConsumerUtil;
43 import com.att.dmf.mr.backends.kafka.KafkaConsumerCache;
44 import com.att.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
45 import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
46 import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
47 import com.att.dmf.mr.constants.CambriaConstants;
48 import com.att.dmf.mr.utils.ConfigurationReader;
49 //import org.slf4j.Logger;
50 //import org.slf4j.LoggerFactory;
51 import com.att.eelf.configuration.EELFLogger;
52 import com.att.eelf.configuration.EELFManager;
53 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
54
55 /**
56  * @author nilanjana.maity
57  *
58  */
59 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
60
61         // private static final Logger log = LoggerFactory
62         // .getLogger(DMaaPKafkaConsumerFactory.class);
63         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
64         // @Autowired
65         // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
66         // KafkaLiveLockAvoider();
67
68         /**
69          * constructor initialization
70          * 
71          * @param settings
72          * @param metrics
73          * @param curator
74          * @throws missingReqdSetting
75          * @throws KafkaConsumerCacheException
76          * @throws UnknownHostException
77          */
78
79         public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
80                         @Qualifier("curator") CuratorFramework curator,
81                         @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
82                         throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
83
84                 String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
85                                 CambriaConstants.kSetting_ApiNodeIdentifier);
86                 if (apiNodeId == null) {
87
88                         apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
89                 }
90
91                 log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
92                 final String mode = CambriaConstants.DMAAP;
93
94                 fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
95                                 "kafka.metadata.broker.list");
96                 if (null == fkafkaBrokers) {
97
98                         fkafkaBrokers = "localhost:9092";
99                 }
100
101                 boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
102                 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
103                                 "cambria.consumer.cache.enabled");
104                 if (null != strkSetting_EnableCache)
105                         kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
106
107                 final boolean isCacheEnabled = kSetting_EnableCache;
108
109                 // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
110                 // metrics) : null;
111                 fCache = null;
112                 if (isCacheEnabled) {
113                         fCache = KafkaConsumerCache.getInstance();
114
115                 }
116                 if (fCache != null) {
117                         fCache.setfMetrics(metrics);
118                         fCache.setfApiId(apiNodeId);
119                         fCache.startCache(mode, curator);
120                         if(kafkaLiveLockAvoider!=null){
121                         kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
122                         fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
123                         }
124                 }
125         }
126
127         /*
128          * getConsumerFor
129          * 
130          * @see
131          * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
132          * java.lang.String, java.lang.String, int, java.lang.String) This method is
133          * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
134          * either from kafkaconsumer cache or create a new connection This also get
135          * the list of other consumer objects for the same consumer group and set to
136          * KafkaConsumer object. This list may be used during poll-rebalancing
137          * issue.
138          */
139         @Override
140         public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
141                         String remotehost) throws UnavailableException, CambriaApiException {
142                 Kafka011Consumer kc;
143
144                 // To synchronize based on the consumer group.
145
146                 Object syncObject = synchash.get(topic + consumerGroupName);
147                 if (null == syncObject) {
148                         syncObject = new Object();
149                         synchash.put(topic + consumerGroupName, syncObject);
150                 }
151
152                 synchronized (syncObject) {
153                         try {
154                                 kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
155
156                         } catch (KafkaConsumerCacheException e) {
157                                 log.info("######@@@@### Error occured in Kafka Caching" + e + "  " + topic + "::" + consumerGroupName
158                                                 + "::" + consumerId);
159                                 log.error("####@@@@## Error occured in Kafka Caching" + e + "  " + topic + "::" + consumerGroupName
160                                                 + "::" + consumerId);
161                                 throw new UnavailableException(e);
162                         }
163
164                         // Ideally if cache exists below flow should be skipped. If cache
165                         // didnt
166                         // exist, then create this first time on this node.
167                         if (kc == null) {
168
169                                 log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
170                                                 + kc);
171
172                                 final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
173                                                 "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
174                                 boolean locked = false;
175
176                                 try {
177
178                                         locked = ipLock.acquire(30, TimeUnit.SECONDS);
179                                         if (!locked) {
180
181                                                 log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
182                                                                 + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
183                                                 throw new UnavailableException(
184                                                                 "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
185                                                                                 + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
186                                         }
187
188                                         // ConfigurationReader.getCurator().checkExists().forPath("S").
189
190                                         log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
191                                                         + "], on topic [" + topic + "].");
192
193                                         fCache.signalOwnership(topic, consumerGroupName, consumerId);
194
195                                         final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
196                                         long fCreateTimeMs = System.currentTimeMillis();
197                                         KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
198                                         kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
199                                                                                                                                                                                                                                 // );
200                                         log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
201
202                                         if (fCache != null) {
203                                                 fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
204                                         }
205
206                                 } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
207                                         log.info(
208                                                         "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
209                                         throw new UnavailableException("Couldn't connect to ZK.");
210                                 } catch (KafkaConsumerCacheException e) {
211                                         log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
212                                                         + " " + consumerGroupName + "/" + consumerId);
213                                 } catch (UnavailableException u) {
214                                         log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
215                                                         + "/" + consumerId);
216                                         throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
217                                 } catch (Exception e) {
218                                         log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
219                                                         + consumerId);
220                                         log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
221                                                         + consumerId);
222                                         
223                                 } finally {
224                                         if (locked) {
225                                                 try {
226                                                         ipLock.release();
227                                                 } catch (Exception e) {
228                                                         throw new UnavailableException("Error while releasing consumer factory lock" + e, e);
229                                                 }
230                                         }
231                                 }
232                         }
233                 }
234                 return kc;
235         }
236
237         @Override
238         public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
239                 if (fCache != null) {
240                         fCache.dropConsumer(topic, consumerGroup, clientId);
241                 }
242         }
243
244         @Override
245         public synchronized Collection<? extends Consumer> getConsumers() {
246                 return fCache.getConsumers();
247         }
248
249         @Override
250         public synchronized void dropCache() {
251                 fCache.dropAllConsumers();
252         }
253
254         
255         private KafkaConsumerCache fCache;
256         private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
257         private String fkafkaBrokers;
258
259
260
261         private static String makeLongKey(String key, String prefix) {
262                 return prefix + "." + key;
263         }
264
265         private void transferSettingIfProvided(Properties target, String key, String prefix) {
266                 String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
267
268                 // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
269                 if (null != keyVal) {
270                         // final String val = fSettings
271                         // .getString(makeLongKey(key, prefix), "");
272                         log.info("Setting [" + key + "] to " + keyVal + ".");
273                         target.put(key, keyVal);
274                 }
275         }
276
277         /**
278          * Name CreateConsumerconfig  
279          * @param topic
280          * @param groupId
281          * @param consumerId
282          * @return Properties
283          * 
284          * This method is to create Properties required to create kafka connection
285          * Group name is replaced with different format groupid--topic to address same 
286          * groupids for multiple topics. Same groupid with multiple topics 
287          * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
288          */
289         private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
290                 final Properties props = new Properties();
291                 //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
292                 //Fix for CPFMF-644 : 
293                 final String fakeGroupName = groupId + "--" + topic;
294                 props.put("group.id", fakeGroupName);
295                 props.put("enable.auto.commit", "false"); // 0.11
296                 props.put("bootstrap.servers", fkafkaBrokers);
297                 /*props.put("sasl.jaas.config",
298                                 "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
299                 props.put("security.protocol", "SASL_PLAINTEXT");
300                 props.put("sasl.mechanism", "PLAIN");*/
301                 props.put("client.id", consumerId);
302
303                 // additional settings: start with our defaults, then pull in configured
304                 // overrides
305                 populateKafkaInternalDefaultsMap();
306                 for (String key : KafkaConsumerKeys) {
307                         transferSettingIfProvided(props, key, "kafka");
308                 }
309
310                 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
311                 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
312
313                 return props;
314         }
315
316
317         private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
318                         "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
319                         "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
320                         "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
321                         "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
322                         "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
323
324         /**
325          * putting values in hashmap like consumer timeout, zookeeper time out, etc
326          * 
327          * @param setting
328          */
329         private static void populateKafkaInternalDefaultsMap() { }
330
331         /*
332          * The starterIncremnt value is just to emulate calling certain consumers,
333          * in this test app all the consumers are local
334          * 
335          */
336         private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
337
338                 return new LiveLockAvoidance() {
339
340                         @Override
341                         public String getAppId() {
342                                 return appId;
343                         }
344
345                         @Override
346                         public void handleRebalanceUnlock(String groupName) {
347                                 log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
348                                 Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
349                         }
350
351                 };
352
353         }
354
355         @SuppressWarnings("rawtypes")
356         @Override
357         public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
358                         String remotehost) throws UnavailableException, CambriaApiException {
359                 // TODO Auto-generated method stub
360                 return null;
361         }
362
363         private HashMap<String, Object> synchash = new HashMap<String, Object>();
364
365 }