fixing code smells
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Arrays;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.FutureTask;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.RunnableFuture;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35
36 import org.apache.commons.lang.StringUtils;
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
41
42 import org.onap.dmaap.dmf.mr.backends.Consumer;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
44
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
46 import com.att.eelf.configuration.EELFLogger;
47 import com.att.eelf.configuration.EELFManager;
48
49 /**
50  * A consumer instance that's created per-request. These are stateless so that
51  * clients can connect to this service as a proxy.
52  * 
53  * @author Ram
54  *
55  */
56 public class Kafka011Consumer implements Consumer {
57         private enum State {
58                 OPENED, CLOSED
59         }
60
61         
62         /**
63          * KafkaConsumer() is constructor. It has following 4 parameters:-
64          * 
65          * @param topic
66          * @param group
67          * @param id
68          * @param cc
69          * 
70          */
71
72         public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
73                         KafkaLiveLockAvoider2 klla) throws Exception {
74                 fTopic = topic;
75                 fGroup = group;
76                 fId = id;
77                 fCreateTimeMs = System.currentTimeMillis();
78                 fLastTouch = fCreateTimeMs;
79                 fPendingMsgs = new LinkedBlockingQueue<>();
80                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
81                 offset = 0;
82                 state = Kafka011Consumer.State.OPENED;
83                 kConsumer = cc;
84                 fKafkaLiveLockAvoider = klla;
85                 
86                 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
87                                 "consumer.timeout");
88                 if (StringUtils.isNotEmpty(consumerTimeOut)) {
89                         consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
90                 }
91                 synchronized (kConsumer) {
92                         kConsumer.subscribe(Arrays.asList(topic));
93                 }
94         }
95
96         private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
97                 return new Consumer.Message() {
98                         @Override
99                         public long getOffset() {
100                                 offset = msg.offset();
101                                 return offset;
102                         }
103
104                         @Override
105                         public String getMessage() {
106                                 return new String(msg.value());
107                         }
108                 };
109         }
110
111         @Override
112         public synchronized Consumer.Message nextMessage() {
113
114                 try {
115                         if (!fPendingMsgs.isEmpty()) {
116                                 return makeMessage(fPendingMsgs.take());
117                         }
118                 } catch (InterruptedException x) {
119                         log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
120                                         x);
121                         Thread.currentThread().interrupt();
122                 }
123
124                 Callable<Boolean> run = new Callable<Boolean>() {
125                         @Override
126                         public Boolean call() throws Exception {
127                                 try {
128                                         ConsumerRecords<String, String> records;
129                                         synchronized (kConsumer) {
130                                                 records = kConsumer.poll(500);
131                                         }
132                                         for (ConsumerRecord<String, String> record : records) {
133                                                 
134                                                 fPendingMsgs.offer(record);
135                                         }
136
137                                 } catch (KafkaException x) {
138                                         log.debug(fLogTag + ": KafkaException ", x);
139
140                                 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
141                                         log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
142
143                                 }
144
145                                 
146                                 return true;
147                         }
148                 };
149
150                 @SuppressWarnings({ "rawtypes", "unchecked" })
151                 RunnableFuture future = new FutureTask(run);
152                 ExecutorService service = Executors.newSingleThreadExecutor();
153                 service.execute(future);
154                 try {
155                         future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
156                         // second
157                 } catch (TimeoutException ex) {
158                 log.error("TimeoutException in in Kafka consumer ", ex);
159                         // timed out. Try to stop the code if possible.
160                         String apiNodeId = null;
161                         try {
162                                 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
163                         } catch (UnknownHostException e1) {
164                                 log.error("unable to get the localhost address ", e1);
165                         }
166
167                         try {
168                                 if (fKafkaLiveLockAvoider != null)
169                                         fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
170                         } catch (Exception e) {
171                                 log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
172                         }
173
174                         forcePollOnConsumer();
175                         future.cancel(true);
176                 } catch (Exception ex) {
177             log.error("Exception in in Kafka consumer ", ex);
178                         // timed out. Try to stop the code if possible.
179                         future.cancel(true);
180                 }
181                 service.shutdown();
182
183                 return null;
184
185         }
186
187         /**
188          * getName() method returns string type value. returns 3 parameters in
189          * string:- fTopic,fGroup,fId
190          * 
191          * @Override
192          */
193         public String getName() {
194                 return fTopic + " : " + fGroup + " : " + fId;
195         }
196
197         /**
198          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
199          * variable value
200          * 
201          * @Override
202          * 
203          */
204         public long getCreateTimeMs() {
205                 return fCreateTimeMs;
206         }
207
208         public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
209                 return kConsumer;
210         }
211
212         /**
213          * getLastAccessMs() method returns long type value. returns fLastTouch
214          * variable value
215          * 
216          * @Override
217          * 
218          */
219         public long getLastAccessMs() {
220                 return fLastTouch;
221         }
222
223         /**
224          * getOffset() method returns long type value. returns offset variable value
225          * 
226          * @Override
227          * 
228          */
229         public long getOffset() {
230                 return offset;
231         }
232
233         /**
234          * commit offsets commitOffsets() method will be called on closed of
235          * KafkaConsumer.
236          * 
237          * @Override
238          * 
239          *
240          *                      public void commitOffsets() { if (getState() ==
241          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
242          *           on closed KafkaConsumer " + getName()); return; }
243          *           fConnector.commitOffsets(); }
244          */
245
246         /**
247          * updating fLastTouch with current time in ms
248          */
249         public void touch() {
250                 fLastTouch = System.currentTimeMillis();
251         }
252
253         /**
254          * getLastTouch() method returns long type value. returns fLastTouch
255          * variable value
256          * 
257          */
258         public long getLastTouch() {
259                 return fLastTouch;
260         }
261
262         /**
263          * setting the kafkaConsumer state to closed
264          */
265         
266         public boolean close() {
267                 if (getState() == Kafka011Consumer.State.CLOSED) {
268
269                         log.error("close() called on closed KafkaConsumer " + getName());
270                         return true;
271                 }
272
273                 
274                 boolean retVal = kafkaConnectorshuttask();
275                 return retVal;
276
277         }
278
279         /* time out if the kafka shutdown fails for some reason */
280
281         private boolean kafkaConnectorshuttask() {
282                 Callable<Boolean> run = new Callable<Boolean>() {
283                         @Override
284                         public Boolean call() throws Exception {
285
286                                 try {
287                                         
288                                         kConsumer.close();
289
290                                 } catch (Exception e) {
291                                         log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
292                                         throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
293                                         
294                                 }
295                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
296
297                                 return true;
298                         }
299                 };
300
301                 @SuppressWarnings({ "rawtypes", "unchecked" })
302                 RunnableFuture future = new FutureTask(run);
303                 ExecutorService service = Executors.newSingleThreadExecutor();
304                 service.execute(future);
305                 try {
306                    future.get(200, TimeUnit.SECONDS); // wait 1
307                         // second
308                 } catch (TimeoutException ex) {
309                         // timed out. Try to stop the code if possible.
310                         log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
311                         future.cancel(true);
312                         setState(Kafka011Consumer.State.OPENED);
313                 } catch (Exception ex) {
314                         // timed out. Try to stop the code if possible.
315                         log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
316                         future.cancel(true);
317                         setState(Kafka011Consumer.State.OPENED);
318                         return false;
319                 }
320                 service.shutdown();
321                 setState(Kafka011Consumer.State.CLOSED);
322                 return true;
323         }
324
325         public void forcePollOnConsumer() {
326                 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
327
328         }
329
330         /**
331          * getConsumerGroup() returns Consumer group
332          * 
333          * @return
334          */
335         public String getConsumerGroup() {
336                 return fGroup;
337         }
338
339         /**
340          * getConsumerId returns Consumer Id
341          * 
342          * @return
343          */
344         public String getConsumerId() {
345                 return fId;
346         }
347
348         /**
349          * getState returns kafkaconsumer state
350          * 
351          * @return
352          */
353         private Kafka011Consumer.State getState() {
354                 return this.state;
355         }
356
357         /**
358          * setState() sets the kafkaConsumer state
359          * 
360          * @param state
361          */
362         private void setState(Kafka011Consumer.State state) {
363                 this.state = state;
364         }
365
366         
367         private final String fTopic;
368         private final String fGroup;
369         private final String fId;
370         private final String fLogTag;
371         
372         private KafkaConsumer<String, String> kConsumer;
373         private long fCreateTimeMs;
374         private long fLastTouch;
375         private long offset;
376         private Kafka011Consumer.State state;
377         private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
378         private int consumerPollTimeOut=5;
379         private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
380         private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
381         
382         @Override
383         public void commitOffsets() {
384                 if (getState() == Kafka011Consumer.State.CLOSED) {
385                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
386                         return;
387                 }
388                 kConsumer.commitSync();
389                 
390
391         }
392
393         @Override
394         public void setOffset(long offsetval) {
395                 offset = offsetval;
396         }
397
398         
399         public void setConsumerCache(KafkaConsumerCache cache) {
400         }
401
402         
403 }