2ec323e00339c278d966c7bd7b82fd85de373388
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
41
42 import org.onap.dmaap.dmf.mr.backends.Consumer;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
44
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
46 import com.att.eelf.configuration.EELFLogger;
47 import com.att.eelf.configuration.EELFManager;
48
49 /**
50  * A consumer instance that's created per-request. These are stateless so that
51  * clients can connect to this service as a proxy.
52  * 
53  * @author Ram
54  *
55  */
56 public class Kafka011Consumer implements Consumer {
57         private enum State {
58                 OPENED, CLOSED
59         }
60
61         
62         /**
63          * KafkaConsumer() is constructor. It has following 4 parameters:-
64          * 
65          * @param topic
66          * @param group
67          * @param id
68          * @param cc
69          * 
70          */
71
72         public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
73                         KafkaLiveLockAvoider2 klla) throws Exception {
74                 fTopic = topic;
75                 fGroup = group;
76                 fId = id;
77                 fCreateTimeMs = System.currentTimeMillis();
78                 fLastTouch = fCreateTimeMs;
79                 fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
80                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
81                 offset = 0;
82                 state = Kafka011Consumer.State.OPENED;
83                 kConsumer = cc;
84                 fKafkaLiveLockAvoider = klla;
85                 
86                 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
87                                 "consumer.timeout");
88                 if (null != consumerTimeOut) {
89                         consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
90                 }
91                 synchronized (kConsumer) {
92                         kConsumer.subscribe(Arrays.asList(topic));
93                 }
94         }
95
96         private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
97                 return new Consumer.Message() {
98                         @Override
99                         public long getOffset() {
100                                 offset = msg.offset();
101                                 return offset;
102                         }
103
104                         @Override
105                         public String getMessage() {
106                                 return new String(msg.value());
107                         }
108                 };
109         }
110
111         @Override
112         public synchronized Consumer.Message nextMessage() {
113
114                 try {
115                         if (fPendingMsgs.size() > 0) {
116                                 return makeMessage(fPendingMsgs.take());
117                         }
118                 } catch (InterruptedException x) {
119                         log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
120                                         x);
121                 }
122
123                 Callable<Boolean> run = new Callable<Boolean>() {
124                         @Override
125                         public Boolean call() throws Exception {
126                                 try {
127                                         ConsumerRecords<String, String> records;
128                                         synchronized (kConsumer) {
129                                                 records = kConsumer.poll(500);
130                                         }
131                                         for (ConsumerRecord<String, String> record : records) {
132                                                 
133                                                 fPendingMsgs.offer(record);
134                                         }
135
136                                 } catch (KafkaException x) {
137                                         log.debug(fLogTag + ": KafkaException " + x.getMessage());
138
139                                 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
140                                         log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
141                                                         + x.getMessage());
142
143                                 }
144
145                                 
146                                 return true;
147                         }
148                 };
149
150                 @SuppressWarnings({ "rawtypes", "unchecked" })
151                 RunnableFuture future = new FutureTask(run);
152                 ExecutorService service = Executors.newSingleThreadExecutor();
153                 service.execute(future);
154                 try {
155                         future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
156                         // second
157                 } catch (TimeoutException ex) {
158                         // timed out. Try to stop the code if possible.
159                         String apiNodeId = null;
160                         try {
161                                 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
162                         } catch (UnknownHostException e1) {
163                                 // TODO Auto-generated catch block
164                                 log.error("unable to get the localhost address");
165                         }
166
167                         try {
168                                 if (fKafkaLiveLockAvoider != null)
169                                         fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
170                         } catch (Exception e) {
171                                 log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
172                         }
173
174                         forcePollOnConsumer();
175                         future.cancel(true);
176                 } catch (Exception ex) {
177                         // timed out. Try to stop the code if possible.
178                         future.cancel(true);
179                 }
180                 service.shutdown();
181
182                 return null;
183
184         }
185
186         /**
187          * getName() method returns string type value. returns 3 parameters in
188          * string:- fTopic,fGroup,fId
189          * 
190          * @Override
191          */
192         public String getName() {
193                 return fTopic + " : " + fGroup + " : " + fId;
194         }
195
196         /**
197          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
198          * variable value
199          * 
200          * @Override
201          * 
202          */
203         public long getCreateTimeMs() {
204                 return fCreateTimeMs;
205         }
206
207         public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
208                 return kConsumer;
209         }
210
211         /**
212          * getLastAccessMs() method returns long type value. returns fLastTouch
213          * variable value
214          * 
215          * @Override
216          * 
217          */
218         public long getLastAccessMs() {
219                 return fLastTouch;
220         }
221
222         /**
223          * getOffset() method returns long type value. returns offset variable value
224          * 
225          * @Override
226          * 
227          */
228         public long getOffset() {
229                 return offset;
230         }
231
232         /**
233          * commit offsets commitOffsets() method will be called on closed of
234          * KafkaConsumer.
235          * 
236          * @Override
237          * 
238          *
239          *                      public void commitOffsets() { if (getState() ==
240          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
241          *           on closed KafkaConsumer " + getName()); return; }
242          *           fConnector.commitOffsets(); }
243          */
244
245         /**
246          * updating fLastTouch with current time in ms
247          */
248         public void touch() {
249                 fLastTouch = System.currentTimeMillis();
250         }
251
252         /**
253          * getLastTouch() method returns long type value. returns fLastTouch
254          * variable value
255          * 
256          */
257         public long getLastTouch() {
258                 return fLastTouch;
259         }
260
261         /**
262          * setting the kafkaConsumer state to closed
263          */
264         
265         public boolean close() {
266                 if (getState() == Kafka011Consumer.State.CLOSED) {
267
268                         log.error("close() called on closed KafkaConsumer " + getName());
269                         return true;
270                 }
271
272                 
273                 boolean retVal = kafkaConnectorshuttask();
274                 return retVal;
275
276         }
277
278         /* time out if the kafka shutdown fails for some reason */
279
280         private boolean kafkaConnectorshuttask() {
281                 Callable<Boolean> run = new Callable<Boolean>() {
282                         @Override
283                         public Boolean call() throws Exception {
284
285                                 try {
286                                         
287                                         kConsumer.close();
288
289                                 } catch (Exception e) {
290                                         log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
291                                         throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
292                                         
293                                 }
294                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
295
296                                 return true;
297                         }
298                 };
299
300                 @SuppressWarnings({ "rawtypes", "unchecked" })
301                 RunnableFuture future = new FutureTask(run);
302                 ExecutorService service = Executors.newSingleThreadExecutor();
303                 service.execute(future);
304                 try {
305                    future.get(200, TimeUnit.SECONDS); // wait 1
306                         // second
307                 } catch (TimeoutException ex) {
308                         // timed out. Try to stop the code if possible.
309                         log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
310                         future.cancel(true);
311                         setState(Kafka011Consumer.State.OPENED);
312                 } catch (Exception ex) {
313                         // timed out. Try to stop the code if possible.
314                         log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
315                                         + ex);
316                         future.cancel(true);
317                         setState(Kafka011Consumer.State.OPENED);
318                         return false;
319                 }
320                 service.shutdown();
321                 setState(Kafka011Consumer.State.CLOSED);
322                 return true;
323         }
324
325         public void forcePollOnConsumer() {
326                 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
327
328         }
329
330         /**
331          * getConsumerGroup() returns Consumer group
332          * 
333          * @return
334          */
335         public String getConsumerGroup() {
336                 return fGroup;
337         }
338
339         /**
340          * getConsumerId returns Consumer Id
341          * 
342          * @return
343          */
344         public String getConsumerId() {
345                 return fId;
346         }
347
348         /**
349          * getState returns kafkaconsumer state
350          * 
351          * @return
352          */
353         private Kafka011Consumer.State getState() {
354                 return this.state;
355         }
356
357         /**
358          * setState() sets the kafkaConsumer state
359          * 
360          * @param state
361          */
362         private void setState(Kafka011Consumer.State state) {
363                 this.state = state;
364         }
365
366         
367         private final String fTopic;
368         private final String fGroup;
369         private final String fId;
370         private final String fLogTag;
371         
372         private KafkaConsumer<String, String> kConsumer;
373         private long fCreateTimeMs;
374         private long fLastTouch;
375         private long offset;
376         private Kafka011Consumer.State state;
377         private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
378         private int consumerPollTimeOut=5;
379         private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
380         private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
381         
382         @Override
383         public void commitOffsets() {
384                 if (getState() == Kafka011Consumer.State.CLOSED) {
385                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
386                         return;
387                 }
388                 kConsumer.commitSync();
389                 
390
391         }
392
393         @Override
394         public void setOffset(long offsetval) {
395                 offset = offsetval;
396         }
397
398         
399         public void setConsumerCache(KafkaConsumerCache cache) {
400         }
401
402         
403 }