9be907391f42709b63cbbc9cc503ec8b3e68ab7c
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / kafka / Kafka011Consumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.backends.kafka;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
41
42 import com.att.dmf.mr.backends.Consumer;
43 import com.att.dmf.mr.constants.CambriaConstants;
44
45
46
47 import com.att.eelf.configuration.EELFLogger;
48 import com.att.eelf.configuration.EELFManager;
49
50 /**
51  * A consumer instance that's created per-request. These are stateless so that
52  * clients can connect to this service as a proxy.
53  * 
54  * @author Ram
55  *
56  */
57 public class Kafka011Consumer implements Consumer {
58         private enum State {
59                 OPENED, CLOSED
60         }
61
62         
63         /**
64          * KafkaConsumer() is constructor. It has following 4 parameters:-
65          * 
66          * @param topic
67          * @param group
68          * @param id
69          * @param cc
70          * 
71          */
72
73         public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
74                         KafkaLiveLockAvoider2 klla) throws Exception {
75                 fTopic = topic;
76                 fGroup = group;
77                 fId = id;
78                 fCreateTimeMs = System.currentTimeMillis();
79                 fLastTouch = fCreateTimeMs;
80                 fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
81                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
82                 offset = 0;
83                 state = Kafka011Consumer.State.OPENED;
84                 kConsumer = cc;
85                 fKafkaLiveLockAvoider = klla;
86                 synchronized (kConsumer) {
87                         kConsumer.subscribe(Arrays.asList(topic));
88                 }
89         }
90
91         private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
92                 return new Consumer.Message() {
93                         @Override
94                         public long getOffset() {
95                                 offset = msg.offset();
96                                 return offset;
97                         }
98
99                         @Override
100                         public String getMessage() {
101                                 return new String(msg.value());
102                         }
103                 };
104         }
105
106         @Override
107         public synchronized Consumer.Message nextMessage() {
108
109                 try {
110                         if (fPendingMsgs.size() > 0) {
111                                 return makeMessage(fPendingMsgs.take());
112                         }
113                 } catch (InterruptedException x) {
114                         log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
115                                         x);
116                 }
117
118                 Callable<Boolean> run = new Callable<Boolean>() {
119                         @Override
120                         public Boolean call() throws Exception {
121                                 try {
122                                         ConsumerRecords<String, String> records;
123                                         synchronized (kConsumer) {
124                                                 records = kConsumer.poll(500);
125                                         }
126                                         for (ConsumerRecord<String, String> record : records) {
127                                                 
128                                                 fPendingMsgs.offer(record);
129                                         }
130
131                                 } catch (KafkaException x) {
132                                         log.debug(fLogTag + ": KafkaException " + x.getMessage());
133
134                                 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
135                                         log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
136                                                         + x.getMessage());
137
138                                 }
139
140                                 
141                                 return true;
142                         }
143                 };
144
145                 @SuppressWarnings({ "rawtypes", "unchecked" })
146                 RunnableFuture future = new FutureTask(run);
147                 ExecutorService service = Executors.newSingleThreadExecutor();
148                 service.execute(future);
149                 try {
150                         future.get(5, TimeUnit.SECONDS); // wait 1
151                         // second
152                 } catch (TimeoutException ex) {
153                         // timed out. Try to stop the code if possible.
154                         String apiNodeId = null;
155                         try {
156                                 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
157                         } catch (UnknownHostException e1) {
158                                 // TODO Auto-generated catch block
159                                 log.error("unable to get the localhost address");
160                         }
161
162                         try {
163                                 if (fKafkaLiveLockAvoider != null)
164                                         fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
165                         } catch (Exception e) {
166                                 log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
167                         }
168
169                         forcePollOnConsumer();
170                         future.cancel(true);
171                 } catch (Exception ex) {
172                         // timed out. Try to stop the code if possible.
173                         future.cancel(true);
174                 }
175                 service.shutdown();
176
177                 return null;
178
179         }
180
181         /**
182          * getName() method returns string type value. returns 3 parameters in
183          * string:- fTopic,fGroup,fId
184          * 
185          * @Override
186          */
187         public String getName() {
188                 return fTopic + " : " + fGroup + " : " + fId;
189         }
190
191         /**
192          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
193          * variable value
194          * 
195          * @Override
196          * 
197          */
198         public long getCreateTimeMs() {
199                 return fCreateTimeMs;
200         }
201
202         public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
203                 return kConsumer;
204         }
205
206         /**
207          * getLastAccessMs() method returns long type value. returns fLastTouch
208          * variable value
209          * 
210          * @Override
211          * 
212          */
213         public long getLastAccessMs() {
214                 return fLastTouch;
215         }
216
217         /**
218          * getOffset() method returns long type value. returns offset variable value
219          * 
220          * @Override
221          * 
222          */
223         public long getOffset() {
224                 return offset;
225         }
226
227         /**
228          * commit offsets commitOffsets() method will be called on closed of
229          * KafkaConsumer.
230          * 
231          * @Override
232          * 
233          *
234          *                      public void commitOffsets() { if (getState() ==
235          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
236          *           on closed KafkaConsumer " + getName()); return; }
237          *           fConnector.commitOffsets(); }
238          */
239
240         /**
241          * updating fLastTouch with current time in ms
242          */
243         public void touch() {
244                 fLastTouch = System.currentTimeMillis();
245         }
246
247         /**
248          * getLastTouch() method returns long type value. returns fLastTouch
249          * variable value
250          * 
251          */
252         public long getLastTouch() {
253                 return fLastTouch;
254         }
255
256         /**
257          * setting the kafkaConsumer state to closed
258          */
259         
260         public boolean close() {
261                 if (getState() == Kafka011Consumer.State.CLOSED) {
262
263                         log.error("close() called on closed KafkaConsumer " + getName());
264                         return true;
265                 }
266
267                 
268                 boolean retVal = kafkaConnectorshuttask();
269                 return retVal;
270
271         }
272
273         /* time out if the kafka shutdown fails for some reason */
274
275         private boolean kafkaConnectorshuttask() {
276                 Callable<Boolean> run = new Callable<Boolean>() {
277                         @Override
278                         public Boolean call() throws Exception {
279
280                                 try {
281                                         
282                                         kConsumer.close();
283
284                                 } catch (Exception e) {
285                                         log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
286                                         throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
287                                         
288                                 }
289                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
290
291                                 return true;
292                         }
293                 };
294
295                 @SuppressWarnings({ "rawtypes", "unchecked" })
296                 RunnableFuture future = new FutureTask(run);
297                 ExecutorService service = Executors.newSingleThreadExecutor();
298                 service.execute(future);
299                 try {
300                    future.get(200, TimeUnit.SECONDS); // wait 1
301                         // second
302                 } catch (TimeoutException ex) {
303                         // timed out. Try to stop the code if possible.
304                         log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
305                         future.cancel(true);
306                         setState(Kafka011Consumer.State.OPENED);
307                 } catch (Exception ex) {
308                         // timed out. Try to stop the code if possible.
309                         log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
310                                         + ex);
311                         future.cancel(true);
312                         setState(Kafka011Consumer.State.OPENED);
313                         return false;
314                 }
315                 service.shutdown();
316                 setState(Kafka011Consumer.State.CLOSED);
317                 return true;
318         }
319
320         public void forcePollOnConsumer() {
321                 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
322
323         }
324
325         /**
326          * getConsumerGroup() returns Consumer group
327          * 
328          * @return
329          */
330         public String getConsumerGroup() {
331                 return fGroup;
332         }
333
334         /**
335          * getConsumerId returns Consumer Id
336          * 
337          * @return
338          */
339         public String getConsumerId() {
340                 return fId;
341         }
342
343         /**
344          * getState returns kafkaconsumer state
345          * 
346          * @return
347          */
348         private Kafka011Consumer.State getState() {
349                 return this.state;
350         }
351
352         /**
353          * setState() sets the kafkaConsumer state
354          * 
355          * @param state
356          */
357         private void setState(Kafka011Consumer.State state) {
358                 this.state = state;
359         }
360
361         
362         private final String fTopic;
363         private final String fGroup;
364         private final String fId;
365         private final String fLogTag;
366         
367         private KafkaConsumer<String, String> kConsumer;
368         private long fCreateTimeMs;
369         private long fLastTouch;
370         private long offset;
371         private Kafka011Consumer.State state;
372         private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
373         private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
374         private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
375         
376         @Override
377         public void commitOffsets() {
378                 if (getState() == Kafka011Consumer.State.CLOSED) {
379                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
380                         return;
381                 }
382                 kConsumer.commitSync();
383                 
384
385         }
386
387         @Override
388         public void setOffset(long offsetval) {
389                 offset = offsetval;
390         }
391
392         
393         public void setConsumerCache(KafkaConsumerCache cache) {
394         }
395
396         
397 }