[MR] Add support for configuring jaas.sasl.config at runtime
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
28 import org.apache.curator.framework.CuratorFramework;
29 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
30 import org.apache.kafka.clients.admin.AdminClientConfig;
31 import org.apache.kafka.clients.consumer.KafkaConsumer;
32 import org.onap.dmaap.dmf.mr.CambriaApiException;
33 import org.onap.dmaap.dmf.mr.backends.Consumer;
34 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
35 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
36 import org.onap.dmaap.dmf.mr.backends.kafka.*;
37 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
38 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
39 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
40 import org.onap.dmaap.dmf.mr.utils.Utils;
41 import org.springframework.beans.factory.annotation.Qualifier;
42
43 import java.net.InetAddress;
44 import java.net.UnknownHostException;
45 import java.util.Collection;
46 import java.util.HashMap;
47 import java.util.Properties;
48 import java.util.concurrent.TimeUnit;
49
50 /**
51  * @author nilanjana.maity
52  *
53  */
54 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
55
56
57         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
58
59
60         /**
61          * constructor initialization
62          *
63          * @param settings
64          * @param metrics
65          * @param curator
66          * @throws missingReqdSetting
67          * @throws KafkaConsumerCacheException
68          * @throws UnknownHostException
69          */
70
71         public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
72                                                                          @Qualifier("curator") CuratorFramework curator,
73                                                                          @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
74                         throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
75
76                 String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
77                                 CambriaConstants.kSetting_ApiNodeIdentifier);
78                 if (apiNodeId == null) {
79
80                         apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
81                 }
82
83                 log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
84                 final String mode = CambriaConstants.DMAAP;
85
86                 fkafkaBrokers = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
87                                 "kafka.metadata.broker.list");
88                 if (null == fkafkaBrokers) {
89
90                         fkafkaBrokers = "localhost:9092";
91                 }
92
93                 boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
94                 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
95                                 "cambria.consumer.cache.enabled");
96                 if (null != strkSetting_EnableCache)
97                         kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
98
99                 final boolean isCacheEnabled = kSetting_EnableCache;
100
101
102                 fCache = null;
103                 if (isCacheEnabled) {
104                         fCache = KafkaConsumerCache.getInstance();
105
106                 }
107                 if (fCache != null) {
108                         fCache.setfMetrics(metrics);
109                         fCache.setfApiId(apiNodeId);
110                         fCache.startCache(mode, curator);
111                         if(kafkaLiveLockAvoider!=null){
112                                 kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
113                                 fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
114                         }
115                 }
116         }
117
118         /*
119          * getConsumerFor
120          *
121          * @see
122          * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
123          * java.lang.String, java.lang.String, int, java.lang.String) This method is
124          * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
125          * either from kafkaconsumer cache or create a new connection This also get
126          * the list of other consumer objects for the same consumer group and set to
127          * KafkaConsumer object. This list may be used during poll-rebalancing
128          * issue.
129          */
130         @Override
131         public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
132                                                                    String remotehost) throws UnavailableException, CambriaApiException {
133                 Kafka011Consumer kc;
134
135                 // To synchronize based on the consumer group.
136
137                 Object syncObject = synchash.get(topic + consumerGroupName);
138                 if (null == syncObject) {
139                         syncObject = new Object();
140                         synchash.put(topic + consumerGroupName, syncObject);
141                 }
142
143                 synchronized (syncObject) {
144                         try {
145                                 kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
146
147                         } catch (KafkaConsumerCacheException e) {
148                                 log.info("######@@@@### Error occured in Kafka Caching" + e + "  " + topic + "::" + consumerGroupName
149                                                 + "::" + consumerId);
150                                 log.error("####@@@@## Error occured in Kafka Caching" + e + "  " + topic + "::" + consumerGroupName
151                                                 + "::" + consumerId);
152                                 throw new UnavailableException(e);
153                         }
154
155                         // Ideally if cache exists below flow should be skipped. If cache
156                         // didnt
157                         // exist, then create this first time on this node.
158                         if (kc == null) {
159
160                                 log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
161                                                 + kc);
162
163                                 final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
164                                                 "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
165                                 boolean locked = false;
166
167                                 try {
168
169                                         locked = ipLock.acquire(30, TimeUnit.SECONDS);
170                                         if (!locked) {
171
172                                                 log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
173                                                                 + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
174                                                 throw new UnavailableException(
175                                                                 "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
176                                                                                 + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
177                                         }
178
179                                         // ConfigurationReader.getCurator().checkExists().forPath("S").
180
181                                         log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
182                                                         + "], on topic [" + topic + "].");
183
184                                         if (fCache != null) {
185                                                 fCache.signalOwnership(topic, consumerGroupName, consumerId);
186                                         }
187
188                                         final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
189                                         long fCreateTimeMs = System.currentTimeMillis();
190                                         KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
191                                         kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
192                                         log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
193
194                                         if (fCache != null) {
195                                                 fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
196                                         }
197
198                                 } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
199                                         log.info(
200                                                         "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
201                                         throw new UnavailableException("Couldn't connect to ZK.");
202                                 } catch (KafkaConsumerCacheException e) {
203                                         log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
204                                                         + " " + consumerGroupName + "/" + consumerId);
205                                 } catch (UnavailableException u) {
206                                         log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
207                                                         + "/" + consumerId);
208                                         throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
209                                 } catch (Exception e) {
210                                         log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
211                                                         + consumerId);
212                                         log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
213                                                         + consumerId);
214
215                                 } finally {
216                                         if (locked) {
217                                                 try {
218                                                         ipLock.release();
219                                                 } catch (Exception e) {
220                                                         log.error("Error while releasing consumer factory lock", e);
221                                                 }
222                                         }
223                                 }
224                         }
225                 }
226                 return kc;
227         }
228
229         @Override
230         public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
231                 if (fCache != null) {
232                         fCache.dropConsumer(topic, consumerGroup, clientId);
233                 }
234         }
235
236         @Override
237         public synchronized Collection<? extends Consumer> getConsumers() {
238                 return fCache.getConsumers();
239         }
240
241         @Override
242         public synchronized void dropCache() {
243                 fCache.dropAllConsumers();
244         }
245
246
247         private KafkaConsumerCache fCache;
248         private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
249         private String fkafkaBrokers;
250
251
252
253         private static String makeLongKey(String key, String prefix) {
254                 return prefix + "." + key;
255         }
256
257         private void transferSettingIfProvided(Properties target, String key, String prefix) {
258                 String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
259
260
261                 if (null != keyVal) {
262
263                         log.info("Setting [" + key + "] to " + keyVal + ".");
264                         target.put(key, keyVal);
265                 }
266         }
267
268         /**
269          * Name CreateConsumerconfig
270          * @param topic
271          * @param groupId
272          * @param consumerId
273          * @return Properties
274          *
275          * This method is to create Properties required to create kafka connection
276          * Group name is replaced with different format groupid--topic to address same
277          * groupids for multiple topics. Same groupid with multiple topics
278          * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
279          */
280         private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
281                 final Properties props = new Properties();
282                 //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
283                 //Fix for CPFMF-644 :
284                 final String fakeGroupName = groupId + "--" + topic;
285                 props.put("group.id", fakeGroupName);
286                 props.put("enable.auto.commit", "false"); // 0.11
287                 props.put("bootstrap.servers", fkafkaBrokers);
288                 if(Utils.isCadiEnabled()){
289                         props.putAll(Utils.addSaslProps());
290                 }
291                 props.put("client.id", consumerId);
292
293                 // additional settings: start with our defaults, then pull in configured
294                 // overrides
295                 populateKafkaInternalDefaultsMap();
296                 for (String key : KafkaConsumerKeys) {
297                         transferSettingIfProvided(props, key, "kafka");
298                 }
299
300                 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
301                 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
302
303                 return props;
304         }
305
306
307         private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
308                         "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
309                         "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
310                         "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
311                         "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
312                         "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
313
314         /**
315          * putting values in hashmap like consumer timeout, zookeeper time out, etc
316          *
317          * @param setting
318          */
319         private static void populateKafkaInternalDefaultsMap() { }
320
321         /*
322          * The starterIncremnt value is just to emulate calling certain consumers,
323          * in this test app all the consumers are local
324          *
325          */
326         private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
327
328                 return new LiveLockAvoidance() {
329
330                         @Override
331                         public String getAppId() {
332                                 return appId;
333                         }
334
335                         @Override
336                         public void handleRebalanceUnlock(String groupName) {
337                                 log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
338                                 Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
339                         }
340
341                 };
342
343         }
344
345         @SuppressWarnings("rawtypes")
346         @Override
347         public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
348                                                                                   String remotehost) throws UnavailableException, CambriaApiException {
349                 // TODO Auto-generated method stub
350                 return null;
351         }
352
353         private HashMap<String, Object> synchash = new HashMap<String, Object>();
354
355 }