DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.kafka.clients.consumer.ConsumerRecord;
29 import org.apache.kafka.clients.consumer.ConsumerRecords;
30 import org.apache.kafka.clients.consumer.KafkaConsumer;
31 import org.apache.kafka.common.KafkaException;
32 import org.onap.dmaap.dmf.mr.backends.Consumer;
33 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
34
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.util.Arrays;
38 import java.util.concurrent.*;
39
40 /**
41  * A consumer instance that's created per-request. These are stateless so that
42  * clients can connect to this service as a proxy.
43  * 
44  * @author Ram
45  *
46  */
47 public class Kafka011Consumer implements Consumer {
48         private enum State {
49                 OPENED, CLOSED
50         }
51
52         
53         /**
54          * KafkaConsumer() is constructor. It has following 4 parameters:-
55          * 
56          * @param topic
57          * @param group
58          * @param id
59          * @param cc
60          * 
61          */
62
63         public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
64                         KafkaLiveLockAvoider2 klla) throws Exception {
65                 fTopic = topic;
66                 fGroup = group;
67                 fId = id;
68                 fCreateTimeMs = System.currentTimeMillis();
69                 fLastTouch = fCreateTimeMs;
70                 fPendingMsgs = new LinkedBlockingQueue<>();
71                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
72                 offset = 0;
73                 state = State.OPENED;
74                 kConsumer = cc;
75                 fKafkaLiveLockAvoider = klla;
76
77                 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
78                                 "consumer.timeout");
79                 if (StringUtils.isNotEmpty(consumerTimeOut)) {
80                         consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
81                 }
82                 synchronized (kConsumer) {
83                         kConsumer.subscribe(Arrays.asList(topic));
84                 }
85         }
86
87         private Message makeMessage(final ConsumerRecord<String, String> msg) {
88                 return new Message() {
89                         @Override
90                         public long getOffset() {
91                                 offset = msg.offset();
92                                 return offset;
93                         }
94
95                         @Override
96                         public String getMessage() {
97                                 return new String(msg.value());
98                         }
99                 };
100         }
101
102         @Override
103         public synchronized Message nextMessage() {
104
105                 try {
106                         if (!fPendingMsgs.isEmpty()) {
107                                 return makeMessage(fPendingMsgs.take());
108                         }
109                 } catch (InterruptedException x) {
110                         log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
111                                         x);
112                         Thread.currentThread().interrupt();
113                 }
114
115                 Callable<Boolean> run = new Callable<Boolean>() {
116                         @Override
117                         public Boolean call() throws Exception {
118                                 try {
119                                         ConsumerRecords<String, String> records;
120                                         synchronized (kConsumer) {
121                                                 records = kConsumer.poll(500);
122                                         }
123                                         for (ConsumerRecord<String, String> record : records) {
124
125                                                 fPendingMsgs.offer(record);
126                                         }
127
128                                 } catch (KafkaException x) {
129                                         log.debug(fLogTag + ": KafkaException ", x);
130
131                                 } catch (IllegalStateException | IllegalArgumentException x) {
132                                         log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
133
134                                 }
135
136
137                                 return true;
138                         }
139                 };
140
141                 @SuppressWarnings({ "rawtypes", "unchecked" })
142                 RunnableFuture future = new FutureTask(run);
143                 ExecutorService service = Executors.newSingleThreadExecutor();
144                 service.execute(future);
145                 try {
146                         future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
147                         // second
148                 } catch (TimeoutException ex) {
149                 log.error("TimeoutException in in Kafka consumer ", ex);
150                         // timed out. Try to stop the code if possible.
151                         String apiNodeId = null;
152                         try {
153                                 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
154                         } catch (UnknownHostException e1) {
155                                 log.error("unable to get the localhost address ", e1);
156                         }
157
158                         try {
159                                 if (fKafkaLiveLockAvoider != null)
160                                         fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
161                         } catch (Exception e) {
162                                 log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
163                         }
164
165                         forcePollOnConsumer();
166                         future.cancel(true);
167                 } catch (Exception ex) {
168             log.error("Exception in in Kafka consumer ", ex);
169                         // timed out. Try to stop the code if possible.
170                         future.cancel(true);
171                 }
172                 service.shutdown();
173
174                 return null;
175
176         }
177
178         /**
179          * getName() method returns string type value. returns 3 parameters in
180          * string:- fTopic,fGroup,fId
181          *
182          * @Override
183          */
184         public String getName() {
185                 return fTopic + " : " + fGroup + " : " + fId;
186         }
187
188         /**
189          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
190          * variable value
191          *
192          * @Override
193          *
194          */
195         public long getCreateTimeMs() {
196                 return fCreateTimeMs;
197         }
198
199         public KafkaConsumer<String, String> getConsumer() {
200                 return kConsumer;
201         }
202
203         /**
204          * getLastAccessMs() method returns long type value. returns fLastTouch
205          * variable value
206          *
207          * @Override
208          *
209          */
210         public long getLastAccessMs() {
211                 return fLastTouch;
212         }
213
214         /**
215          * getOffset() method returns long type value. returns offset variable value
216          *
217          * @Override
218          *
219          */
220         public long getOffset() {
221                 return offset;
222         }
223
224         /**
225          * commit offsets commitOffsets() method will be called on closed of
226          * KafkaConsumer.
227          *
228          * @Override
229          *
230          *
231          *                      public void commitOffsets() { if (getState() ==
232          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
233          *           on closed KafkaConsumer " + getName()); return; }
234          *           fConnector.commitOffsets(); }
235          */
236
237         /**
238          * updating fLastTouch with current time in ms
239          */
240         public void touch() {
241                 fLastTouch = System.currentTimeMillis();
242         }
243
244         /**
245          * getLastTouch() method returns long type value. returns fLastTouch
246          * variable value
247          *
248          */
249         public long getLastTouch() {
250                 return fLastTouch;
251         }
252
253         /**
254          * setting the kafkaConsumer state to closed
255          */
256
257         public boolean close() {
258                 if (getState() == State.CLOSED) {
259
260                         log.error("close() called on closed KafkaConsumer " + getName());
261                         return true;
262                 }
263
264
265                 boolean retVal = kafkaConnectorshuttask();
266                 return retVal;
267
268         }
269
270         /* time out if the kafka shutdown fails for some reason */
271
272         private boolean kafkaConnectorshuttask() {
273                 Callable<Boolean> run = new Callable<Boolean>() {
274                         @Override
275                         public Boolean call() throws Exception {
276
277                                 try {
278
279                                         kConsumer.close();
280
281                                 } catch (Exception e) {
282                                         log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
283                                         throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
284
285                                 }
286                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
287
288                                 return true;
289                         }
290                 };
291
292                 @SuppressWarnings({ "rawtypes", "unchecked" })
293                 RunnableFuture future = new FutureTask(run);
294                 ExecutorService service = Executors.newSingleThreadExecutor();
295                 service.execute(future);
296                 try {
297                    future.get(200, TimeUnit.SECONDS); // wait 1
298                         // second
299                 } catch (TimeoutException ex) {
300                         // timed out. Try to stop the code if possible.
301                         log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
302                         future.cancel(true);
303                         setState(State.OPENED);
304                 } catch (Exception ex) {
305                         // timed out. Try to stop the code if possible.
306                         log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
307                         future.cancel(true);
308                         setState(State.OPENED);
309                         return false;
310                 }
311                 service.shutdown();
312                 setState(State.CLOSED);
313                 return true;
314         }
315
316         public void forcePollOnConsumer() {
317                 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
318
319         }
320
321         /**
322          * getConsumerGroup() returns Consumer group
323          *
324          * @return
325          */
326         public String getConsumerGroup() {
327                 return fGroup;
328         }
329
330         /**
331          * getConsumerId returns Consumer Id
332          *
333          * @return
334          */
335         public String getConsumerId() {
336                 return fId;
337         }
338
339         /**
340          * getState returns kafkaconsumer state
341          *
342          * @return
343          */
344         private State getState() {
345                 return this.state;
346         }
347
348         /**
349          * setState() sets the kafkaConsumer state
350          *
351          * @param state
352          */
353         private void setState(State state) {
354                 this.state = state;
355         }
356
357
358         private final String fTopic;
359         private final String fGroup;
360         private final String fId;
361         private final String fLogTag;
362
363         private KafkaConsumer<String, String> kConsumer;
364         private long fCreateTimeMs;
365         private long fLastTouch;
366         private long offset;
367         private State state;
368         private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
369         private int consumerPollTimeOut=5;
370         private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
371         private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
372
373         @Override
374         public void commitOffsets() {
375                 if (getState() == State.CLOSED) {
376                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
377                         return;
378                 }
379                 kConsumer.commitSync();
380                 
381
382         }
383
384         @Override
385         public void setOffset(long offsetval) {
386                 offset = offsetval;
387         }
388
389         
390         public void setConsumerCache(KafkaConsumerCache cache) {
391         }
392
393         
394 }