b66a251adb983f38e7784bd5d1f2f6d4f4d02fe3
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.kafka.clients.consumer.ConsumerRecord;
39 import org.apache.kafka.clients.consumer.ConsumerRecords;
40 import org.apache.kafka.clients.consumer.KafkaConsumer;
41 import org.apache.kafka.common.KafkaException;
42
43 import org.onap.dmaap.dmf.mr.backends.Consumer;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45
46 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
47 import com.att.eelf.configuration.EELFLogger;
48 import com.att.eelf.configuration.EELFManager;
49
50 /**
51  * A consumer instance that's created per-request. These are stateless so that
52  * clients can connect to this service as a proxy.
53  * 
54  * @author Ram
55  *
56  */
57 public class Kafka011Consumer implements Consumer {
58         private enum State {
59                 OPENED, CLOSED
60         }
61
62         
63         /**
64          * KafkaConsumer() is constructor. It has following 4 parameters:-
65          * 
66          * @param topic
67          * @param group
68          * @param id
69          * @param cc
70          * 
71          */
72
73         public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
74                         KafkaLiveLockAvoider2 klla) throws Exception {
75                 fTopic = topic;
76                 fGroup = group;
77                 fId = id;
78                 fCreateTimeMs = System.currentTimeMillis();
79                 fLastTouch = fCreateTimeMs;
80                 fPendingMsgs = new LinkedBlockingQueue<>();
81                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
82                 offset = 0;
83                 state = Kafka011Consumer.State.OPENED;
84                 kConsumer = cc;
85                 fKafkaLiveLockAvoider = klla;
86                 
87                 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
88                                 "consumer.timeout");
89                 if (StringUtils.isNotEmpty(consumerTimeOut)) {
90                         consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
91                 }
92                 synchronized (kConsumer) {
93                         kConsumer.subscribe(Arrays.asList(topic));
94                 }
95         }
96
97         private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
98                 return new Consumer.Message() {
99                         @Override
100                         public long getOffset() {
101                                 offset = msg.offset();
102                                 return offset;
103                         }
104
105                         @Override
106                         public String getMessage() {
107                                 return new String(msg.value());
108                         }
109                 };
110         }
111
112         @Override
113         public synchronized Consumer.Message nextMessage() {
114
115                 try {
116                         if (fPendingMsgs.isEmpty()) {
117                                 return makeMessage(fPendingMsgs.take());
118                         }
119                 } catch (InterruptedException x) {
120                         log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
121                                         x);
122                 }
123
124                 Callable<Boolean> run = new Callable<Boolean>() {
125                         @Override
126                         public Boolean call() throws Exception {
127                                 try {
128                                         ConsumerRecords<String, String> records;
129                                         synchronized (kConsumer) {
130                                                 records = kConsumer.poll(500);
131                                         }
132                                         for (ConsumerRecord<String, String> record : records) {
133                                                 
134                                                 fPendingMsgs.offer(record);
135                                         }
136
137                                 } catch (KafkaException x) {
138                                         log.debug(fLogTag + ": KafkaException " + x.getMessage());
139
140                                 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
141                                         log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
142                                                         + x.getMessage());
143
144                                 }
145
146                                 
147                                 return true;
148                         }
149                 };
150
151                 @SuppressWarnings({ "rawtypes", "unchecked" })
152                 RunnableFuture future = new FutureTask(run);
153                 ExecutorService service = Executors.newSingleThreadExecutor();
154                 service.execute(future);
155                 try {
156                         future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
157                         // second
158                 } catch (TimeoutException ex) {
159                         // timed out. Try to stop the code if possible.
160                         String apiNodeId = null;
161                         try {
162                                 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
163                         } catch (UnknownHostException e1) {
164                                 // TODO Auto-generated catch block
165                                 log.error("unable to get the localhost address");
166                         }
167
168                         try {
169                                 if (fKafkaLiveLockAvoider != null)
170                                         fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
171                         } catch (Exception e) {
172                                 log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
173                         }
174
175                         forcePollOnConsumer();
176                         future.cancel(true);
177                 } catch (Exception ex) {
178                         // timed out. Try to stop the code if possible.
179                         future.cancel(true);
180                 }
181                 service.shutdown();
182
183                 return null;
184
185         }
186
187         /**
188          * getName() method returns string type value. returns 3 parameters in
189          * string:- fTopic,fGroup,fId
190          * 
191          * @Override
192          */
193         public String getName() {
194                 return fTopic + " : " + fGroup + " : " + fId;
195         }
196
197         /**
198          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
199          * variable value
200          * 
201          * @Override
202          * 
203          */
204         public long getCreateTimeMs() {
205                 return fCreateTimeMs;
206         }
207
208         public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
209                 return kConsumer;
210         }
211
212         /**
213          * getLastAccessMs() method returns long type value. returns fLastTouch
214          * variable value
215          * 
216          * @Override
217          * 
218          */
219         public long getLastAccessMs() {
220                 return fLastTouch;
221         }
222
223         /**
224          * getOffset() method returns long type value. returns offset variable value
225          * 
226          * @Override
227          * 
228          */
229         public long getOffset() {
230                 return offset;
231         }
232
233         /**
234          * commit offsets commitOffsets() method will be called on closed of
235          * KafkaConsumer.
236          * 
237          * @Override
238          * 
239          *
240          *                      public void commitOffsets() { if (getState() ==
241          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
242          *           on closed KafkaConsumer " + getName()); return; }
243          *           fConnector.commitOffsets(); }
244          */
245
246         /**
247          * updating fLastTouch with current time in ms
248          */
249         public void touch() {
250                 fLastTouch = System.currentTimeMillis();
251         }
252
253         /**
254          * getLastTouch() method returns long type value. returns fLastTouch
255          * variable value
256          * 
257          */
258         public long getLastTouch() {
259                 return fLastTouch;
260         }
261
262         /**
263          * setting the kafkaConsumer state to closed
264          */
265         
266         public boolean close() {
267                 if (getState() == Kafka011Consumer.State.CLOSED) {
268
269                         log.error("close() called on closed KafkaConsumer " + getName());
270                         return true;
271                 }
272
273                 
274                 boolean retVal = kafkaConnectorshuttask();
275                 return retVal;
276
277         }
278
279         /* time out if the kafka shutdown fails for some reason */
280
281         private boolean kafkaConnectorshuttask() {
282                 Callable<Boolean> run = new Callable<Boolean>() {
283                         @Override
284                         public Boolean call() throws Exception {
285
286                                 try {
287                                         
288                                         kConsumer.close();
289
290                                 } catch (Exception e) {
291                                         log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
292                                         throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
293                                         
294                                 }
295                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
296
297                                 return true;
298                         }
299                 };
300
301                 @SuppressWarnings({ "rawtypes", "unchecked" })
302                 RunnableFuture future = new FutureTask(run);
303                 ExecutorService service = Executors.newSingleThreadExecutor();
304                 service.execute(future);
305                 try {
306                    future.get(200, TimeUnit.SECONDS); // wait 1
307                         // second
308                 } catch (TimeoutException ex) {
309                         // timed out. Try to stop the code if possible.
310                         log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
311                         future.cancel(true);
312                         setState(Kafka011Consumer.State.OPENED);
313                 } catch (Exception ex) {
314                         // timed out. Try to stop the code if possible.
315                         log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
316                                         + ex);
317                         future.cancel(true);
318                         setState(Kafka011Consumer.State.OPENED);
319                         return false;
320                 }
321                 service.shutdown();
322                 setState(Kafka011Consumer.State.CLOSED);
323                 return true;
324         }
325
326         public void forcePollOnConsumer() {
327                 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
328
329         }
330
331         /**
332          * getConsumerGroup() returns Consumer group
333          * 
334          * @return
335          */
336         public String getConsumerGroup() {
337                 return fGroup;
338         }
339
340         /**
341          * getConsumerId returns Consumer Id
342          * 
343          * @return
344          */
345         public String getConsumerId() {
346                 return fId;
347         }
348
349         /**
350          * getState returns kafkaconsumer state
351          * 
352          * @return
353          */
354         private Kafka011Consumer.State getState() {
355                 return this.state;
356         }
357
358         /**
359          * setState() sets the kafkaConsumer state
360          * 
361          * @param state
362          */
363         private void setState(Kafka011Consumer.State state) {
364                 this.state = state;
365         }
366
367         
368         private final String fTopic;
369         private final String fGroup;
370         private final String fId;
371         private final String fLogTag;
372         
373         private KafkaConsumer<String, String> kConsumer;
374         private long fCreateTimeMs;
375         private long fLastTouch;
376         private long offset;
377         private Kafka011Consumer.State state;
378         private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
379         private int consumerPollTimeOut=5;
380         private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
381         private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
382         
383         @Override
384         public void commitOffsets() {
385                 if (getState() == Kafka011Consumer.State.CLOSED) {
386                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
387                         return;
388                 }
389                 kConsumer.commitSync();
390                 
391
392         }
393
394         @Override
395         public void setOffset(long offsetval) {
396                 offset = offsetval;
397         }
398
399         
400         public void setConsumerCache(KafkaConsumerCache cache) {
401         }
402
403         
404 }