b3f3c8f16d3d358b76e28beb656a3f0f4bcf822e
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.kafka.clients.consumer.ConsumerRecord;
39 import org.apache.kafka.clients.consumer.ConsumerRecords;
40 import org.apache.kafka.clients.consumer.KafkaConsumer;
41 import org.apache.kafka.common.KafkaException;
42
43 import org.onap.dmaap.dmf.mr.backends.Consumer;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45
46 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
47 import com.att.eelf.configuration.EELFLogger;
48 import com.att.eelf.configuration.EELFManager;
49
50 /**
51  * A consumer instance that's created per-request. These are stateless so that
52  * clients can connect to this service as a proxy.
53  * 
54  * @author Ram
55  *
56  */
57 public class Kafka011Consumer implements Consumer {
58         private enum State {
59                 OPENED, CLOSED
60         }
61
62         
63         /**
64          * KafkaConsumer() is constructor. It has following 4 parameters:-
65          * 
66          * @param topic
67          * @param group
68          * @param id
69          * @param cc
70          * 
71          */
72
73         public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
74                         KafkaLiveLockAvoider2 klla) throws Exception {
75                 fTopic = topic;
76                 fGroup = group;
77                 fId = id;
78                 fCreateTimeMs = System.currentTimeMillis();
79                 fLastTouch = fCreateTimeMs;
80                 fPendingMsgs = new LinkedBlockingQueue<>();
81                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
82                 offset = 0;
83                 state = Kafka011Consumer.State.OPENED;
84                 kConsumer = cc;
85                 fKafkaLiveLockAvoider = klla;
86                 
87                 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
88                                 "consumer.timeout");
89                 if (StringUtils.isNotEmpty(consumerTimeOut)) {
90                         consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
91                 }
92                 synchronized (kConsumer) {
93                         kConsumer.subscribe(Arrays.asList(topic));
94                 }
95         }
96
97         private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
98                 return new Consumer.Message() {
99                         @Override
100                         public long getOffset() {
101                                 offset = msg.offset();
102                                 return offset;
103                         }
104
105                         @Override
106                         public String getMessage() {
107                                 return new String(msg.value());
108                         }
109                 };
110         }
111
112         @Override
113         public synchronized Consumer.Message nextMessage() {
114
115                 try {
116                         if (fPendingMsgs.isEmpty()) {
117                                 return makeMessage(fPendingMsgs.take());
118                         }
119                 } catch (InterruptedException x) {
120                         log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
121                                         x);
122                         Thread.currentThread().interrupt();
123                 }
124
125                 Callable<Boolean> run = new Callable<Boolean>() {
126                         @Override
127                         public Boolean call() throws Exception {
128                                 try {
129                                         ConsumerRecords<String, String> records;
130                                         synchronized (kConsumer) {
131                                                 records = kConsumer.poll(500);
132                                         }
133                                         for (ConsumerRecord<String, String> record : records) {
134                                                 
135                                                 fPendingMsgs.offer(record);
136                                         }
137
138                                 } catch (KafkaException x) {
139                                         log.debug(fLogTag + ": KafkaException ", x);
140
141                                 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
142                                         log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
143
144                                 }
145
146                                 
147                                 return true;
148                         }
149                 };
150
151                 @SuppressWarnings({ "rawtypes", "unchecked" })
152                 RunnableFuture future = new FutureTask(run);
153                 ExecutorService service = Executors.newSingleThreadExecutor();
154                 service.execute(future);
155                 try {
156                         future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
157                         // second
158                 } catch (TimeoutException ex) {
159                 log.error("TimeoutException in in Kafka consumer ", ex);
160                         // timed out. Try to stop the code if possible.
161                         String apiNodeId = null;
162                         try {
163                                 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
164                         } catch (UnknownHostException e1) {
165                                 log.error("unable to get the localhost address ", e1);
166                         }
167
168                         try {
169                                 if (fKafkaLiveLockAvoider != null)
170                                         fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
171                         } catch (Exception e) {
172                                 log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
173                         }
174
175                         forcePollOnConsumer();
176                         future.cancel(true);
177                 } catch (Exception ex) {
178             log.error("Exception in in Kafka consumer ", ex);
179                         // timed out. Try to stop the code if possible.
180                         future.cancel(true);
181                 }
182                 service.shutdown();
183
184                 return null;
185
186         }
187
188         /**
189          * getName() method returns string type value. returns 3 parameters in
190          * string:- fTopic,fGroup,fId
191          * 
192          * @Override
193          */
194         public String getName() {
195                 return fTopic + " : " + fGroup + " : " + fId;
196         }
197
198         /**
199          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
200          * variable value
201          * 
202          * @Override
203          * 
204          */
205         public long getCreateTimeMs() {
206                 return fCreateTimeMs;
207         }
208
209         public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
210                 return kConsumer;
211         }
212
213         /**
214          * getLastAccessMs() method returns long type value. returns fLastTouch
215          * variable value
216          * 
217          * @Override
218          * 
219          */
220         public long getLastAccessMs() {
221                 return fLastTouch;
222         }
223
224         /**
225          * getOffset() method returns long type value. returns offset variable value
226          * 
227          * @Override
228          * 
229          */
230         public long getOffset() {
231                 return offset;
232         }
233
234         /**
235          * commit offsets commitOffsets() method will be called on closed of
236          * KafkaConsumer.
237          * 
238          * @Override
239          * 
240          *
241          *                      public void commitOffsets() { if (getState() ==
242          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
243          *           on closed KafkaConsumer " + getName()); return; }
244          *           fConnector.commitOffsets(); }
245          */
246
247         /**
248          * updating fLastTouch with current time in ms
249          */
250         public void touch() {
251                 fLastTouch = System.currentTimeMillis();
252         }
253
254         /**
255          * getLastTouch() method returns long type value. returns fLastTouch
256          * variable value
257          * 
258          */
259         public long getLastTouch() {
260                 return fLastTouch;
261         }
262
263         /**
264          * setting the kafkaConsumer state to closed
265          */
266         
267         public boolean close() {
268                 if (getState() == Kafka011Consumer.State.CLOSED) {
269
270                         log.error("close() called on closed KafkaConsumer " + getName());
271                         return true;
272                 }
273
274                 
275                 boolean retVal = kafkaConnectorshuttask();
276                 return retVal;
277
278         }
279
280         /* time out if the kafka shutdown fails for some reason */
281
282         private boolean kafkaConnectorshuttask() {
283                 Callable<Boolean> run = new Callable<Boolean>() {
284                         @Override
285                         public Boolean call() throws Exception {
286
287                                 try {
288                                         
289                                         kConsumer.close();
290
291                                 } catch (Exception e) {
292                                         log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
293                                         throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
294                                         
295                                 }
296                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
297
298                                 return true;
299                         }
300                 };
301
302                 @SuppressWarnings({ "rawtypes", "unchecked" })
303                 RunnableFuture future = new FutureTask(run);
304                 ExecutorService service = Executors.newSingleThreadExecutor();
305                 service.execute(future);
306                 try {
307                    future.get(200, TimeUnit.SECONDS); // wait 1
308                         // second
309                 } catch (TimeoutException ex) {
310                         // timed out. Try to stop the code if possible.
311                         log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
312                         future.cancel(true);
313                         setState(Kafka011Consumer.State.OPENED);
314                 } catch (Exception ex) {
315                         // timed out. Try to stop the code if possible.
316                         log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
317                         future.cancel(true);
318                         setState(Kafka011Consumer.State.OPENED);
319                         return false;
320                 }
321                 service.shutdown();
322                 setState(Kafka011Consumer.State.CLOSED);
323                 return true;
324         }
325
326         public void forcePollOnConsumer() {
327                 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
328
329         }
330
331         /**
332          * getConsumerGroup() returns Consumer group
333          * 
334          * @return
335          */
336         public String getConsumerGroup() {
337                 return fGroup;
338         }
339
340         /**
341          * getConsumerId returns Consumer Id
342          * 
343          * @return
344          */
345         public String getConsumerId() {
346                 return fId;
347         }
348
349         /**
350          * getState returns kafkaconsumer state
351          * 
352          * @return
353          */
354         private Kafka011Consumer.State getState() {
355                 return this.state;
356         }
357
358         /**
359          * setState() sets the kafkaConsumer state
360          * 
361          * @param state
362          */
363         private void setState(Kafka011Consumer.State state) {
364                 this.state = state;
365         }
366
367         
368         private final String fTopic;
369         private final String fGroup;
370         private final String fId;
371         private final String fLogTag;
372         
373         private KafkaConsumer<String, String> kConsumer;
374         private long fCreateTimeMs;
375         private long fLastTouch;
376         private long offset;
377         private Kafka011Consumer.State state;
378         private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
379         private int consumerPollTimeOut=5;
380         private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
381         private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
382         
383         @Override
384         public void commitOffsets() {
385                 if (getState() == Kafka011Consumer.State.CLOSED) {
386                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
387                         return;
388                 }
389                 kConsumer.commitSync();
390                 
391
392         }
393
394         @Override
395         public void setOffset(long offsetval) {
396                 offset = offsetval;
397         }
398
399         
400         public void setConsumerCache(KafkaConsumerCache cache) {
401         }
402
403         
404 }