DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
28 import org.apache.curator.framework.CuratorFramework;
29 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
30 import org.apache.kafka.clients.consumer.KafkaConsumer;
31 import org.onap.dmaap.dmf.mr.CambriaApiException;
32 import org.onap.dmaap.dmf.mr.backends.Consumer;
33 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
34 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
35 import org.onap.dmaap.dmf.mr.backends.kafka.*;
36 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
37 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
38 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
39 import org.onap.dmaap.dmf.mr.utils.Utils;
40 import org.springframework.beans.factory.annotation.Qualifier;
41
42 import java.net.InetAddress;
43 import java.net.UnknownHostException;
44 import java.util.Collection;
45 import java.util.HashMap;
46 import java.util.Properties;
47 import java.util.concurrent.TimeUnit;
48
49 /**
50  * @author nilanjana.maity
51  *
52  */
53 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
54
55         
56         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
57         
58
59         /**
60          * constructor initialization
61          * 
62          * @param settings
63          * @param metrics
64          * @param curator
65          * @throws missingReqdSetting
66          * @throws KafkaConsumerCacheException
67          * @throws UnknownHostException
68          */
69
70         public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
71                         @Qualifier("curator") CuratorFramework curator,
72                         @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
73                         throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
74
75                 String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
76                                 CambriaConstants.kSetting_ApiNodeIdentifier);
77                 if (apiNodeId == null) {
78
79                         apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
80                 }
81
82                 log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
83                 final String mode = CambriaConstants.DMAAP;
84
85                 fkafkaBrokers = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
86                                 "kafka.metadata.broker.list");
87                 if (null == fkafkaBrokers) {
88
89                         fkafkaBrokers = "localhost:9092";
90                 }
91
92                 boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
93                 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
94                                 "cambria.consumer.cache.enabled");
95                 if (null != strkSetting_EnableCache)
96                         kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
97
98                 final boolean isCacheEnabled = kSetting_EnableCache;
99
100                 
101                 fCache = null;
102                 if (isCacheEnabled) {
103                         fCache = KafkaConsumerCache.getInstance();
104
105                 }
106                 if (fCache != null) {
107                         fCache.setfMetrics(metrics);
108                         fCache.setfApiId(apiNodeId);
109                         fCache.startCache(mode, curator);
110                         if(kafkaLiveLockAvoider!=null){
111                         kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
112                         fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
113                         }
114                 }
115         }
116
117         /*
118          * getConsumerFor
119          * 
120          * @see
121          * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
122          * java.lang.String, java.lang.String, int, java.lang.String) This method is
123          * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
124          * either from kafkaconsumer cache or create a new connection This also get
125          * the list of other consumer objects for the same consumer group and set to
126          * KafkaConsumer object. This list may be used during poll-rebalancing
127          * issue.
128          */
129         @Override
130         public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
131                         String remotehost) throws UnavailableException, CambriaApiException {
132                 Kafka011Consumer kc;
133
134                 // To synchronize based on the consumer group.
135
136                 Object syncObject = synchash.get(topic + consumerGroupName);
137                 if (null == syncObject) {
138                         syncObject = new Object();
139                         synchash.put(topic + consumerGroupName, syncObject);
140                 }
141
142                 synchronized (syncObject) {
143                         try {
144                                 kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
145
146                         } catch (KafkaConsumerCacheException e) {
147                                 log.info("######@@@@### Error occured in Kafka Caching" + e + "  " + topic + "::" + consumerGroupName
148                                                 + "::" + consumerId);
149                                 log.error("####@@@@## Error occured in Kafka Caching" + e + "  " + topic + "::" + consumerGroupName
150                                                 + "::" + consumerId);
151                                 throw new UnavailableException(e);
152                         }
153
154                         // Ideally if cache exists below flow should be skipped. If cache
155                         // didnt
156                         // exist, then create this first time on this node.
157                         if (kc == null) {
158
159                                 log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
160                                                 + kc);
161
162                                 final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
163                                                 "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
164                                 boolean locked = false;
165
166                                 try {
167
168                                         locked = ipLock.acquire(30, TimeUnit.SECONDS);
169                                         if (!locked) {
170
171                                                 log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
172                                                                 + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
173                                                 throw new UnavailableException(
174                                                                 "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
175                                                                                 + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
176                                         }
177
178                                         // ConfigurationReader.getCurator().checkExists().forPath("S").
179
180                                         log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
181                                                         + "], on topic [" + topic + "].");
182                                         
183                                         if (fCache != null) {
184                                                 fCache.signalOwnership(topic, consumerGroupName, consumerId);
185                                         }
186                                         
187                                         final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
188                                         long fCreateTimeMs = System.currentTimeMillis();
189                                         KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
190                                         kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
191                                         log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
192
193                                         if (fCache != null) {
194                                                 fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
195                                         }
196
197                                 } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
198                                         log.info(
199                                                         "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
200                                         throw new UnavailableException("Couldn't connect to ZK.");
201                                 } catch (KafkaConsumerCacheException e) {
202                                         log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
203                                                         + " " + consumerGroupName + "/" + consumerId);
204                                 } catch (UnavailableException u) {
205                                         log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
206                                                         + "/" + consumerId);
207                                         throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
208                                 } catch (Exception e) {
209                                         log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
210                                                         + consumerId);
211                                         log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
212                                                         + consumerId);
213                                         
214                                 } finally {
215                                         if (locked) {
216                                                 try {
217                                                         ipLock.release();
218                                                 } catch (Exception e) {
219                                                         log.error("Error while releasing consumer factory lock", e);
220                                                 }
221                                         }
222                                 }
223                         }
224                 }
225                 return kc;
226         }
227
228         @Override
229         public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
230                 if (fCache != null) {
231                         fCache.dropConsumer(topic, consumerGroup, clientId);
232                 }
233         }
234
235         @Override
236         public synchronized Collection<? extends Consumer> getConsumers() {
237                 return fCache.getConsumers();
238         }
239
240         @Override
241         public synchronized void dropCache() {
242                 fCache.dropAllConsumers();
243         }
244
245         
246         private KafkaConsumerCache fCache;
247         private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
248         private String fkafkaBrokers;
249
250
251
252         private static String makeLongKey(String key, String prefix) {
253                 return prefix + "." + key;
254         }
255
256         private void transferSettingIfProvided(Properties target, String key, String prefix) {
257                 String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
258
259                 
260                 if (null != keyVal) {
261                 
262                         log.info("Setting [" + key + "] to " + keyVal + ".");
263                         target.put(key, keyVal);
264                 }
265         }
266
267         /**
268          * Name CreateConsumerconfig  
269          * @param topic
270          * @param groupId
271          * @param consumerId
272          * @return Properties
273          * 
274          * This method is to create Properties required to create kafka connection
275          * Group name is replaced with different format groupid--topic to address same 
276          * groupids for multiple topics. Same groupid with multiple topics 
277          * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
278          */
279         private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
280                 final Properties props = new Properties();
281                 //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
282                 //Fix for CPFMF-644 : 
283                 final String fakeGroupName = groupId + "--" + topic;
284                 props.put("group.id", fakeGroupName);
285                 props.put("enable.auto.commit", "false"); // 0.11
286                 props.put("bootstrap.servers", fkafkaBrokers);
287                 if(Utils.isCadiEnabled()){
288                 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
289                 props.put("security.protocol", "SASL_PLAINTEXT");
290                 props.put("sasl.mechanism", "PLAIN");
291                 }
292                 props.put("client.id", consumerId);
293
294                 // additional settings: start with our defaults, then pull in configured
295                 // overrides
296                 populateKafkaInternalDefaultsMap();
297                 for (String key : KafkaConsumerKeys) {
298                         transferSettingIfProvided(props, key, "kafka");
299                 }
300
301                 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
302                 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
303
304                 return props;
305         }
306
307
308         private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
309                         "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
310                         "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
311                         "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
312                         "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
313                         "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
314
315         /**
316          * putting values in hashmap like consumer timeout, zookeeper time out, etc
317          * 
318          * @param setting
319          */
320         private static void populateKafkaInternalDefaultsMap() { }
321
322         /*
323          * The starterIncremnt value is just to emulate calling certain consumers,
324          * in this test app all the consumers are local
325          * 
326          */
327         private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
328
329                 return new LiveLockAvoidance() {
330
331                         @Override
332                         public String getAppId() {
333                                 return appId;
334                         }
335
336                         @Override
337                         public void handleRebalanceUnlock(String groupName) {
338                                 log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
339                                 Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
340                         }
341
342                 };
343
344         }
345
346         @SuppressWarnings("rawtypes")
347         @Override
348         public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
349                         String remotehost) throws UnavailableException, CambriaApiException {
350                 // TODO Auto-generated method stub
351                 return null;
352         }
353
354         private HashMap<String, Object> synchash = new HashMap<String, Object>();
355
356 }