dd6259f94b99ef846e6353eb9ae6fd478134c774
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / kafka / KafkaConsumer.txt
1 package com.att.dmf.mr.backends.kafka;
2
3 import java.util.Arrays;
4 import java.util.Properties;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.FutureTask;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.RunnableFuture;
11 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.TimeoutException;
13
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.common.KafkaException;
17
18 import com.att.dmf.mr.backends.Consumer;
19
20 //import org.slf4j.Logger;
21 //import org.slf4j.LoggerFactory;
22
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
25
26 /**
27  * A consumer instance that's created per-request. These are stateless so that
28  * clients can connect to this service as a proxy.
29  * 
30  * @author peter
31  *
32  */
33 public class KafkaConsumer implements Consumer {
34         private enum State {
35                 OPENED, CLOSED
36         }
37
38         /**
39          * KafkaConsumer() is constructor. It has following 4 parameters:-
40          * 
41          * @param topic
42          * @param group
43          * @param id
44          * @param cc
45          * 
46          */
47
48         public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
49                 fTopic = topic;
50                 fGroup = group;
51                 fId = id;
52                 // fConnector = cc;
53
54                 fCreateTimeMs = System.currentTimeMillis();
55                 fLastTouch = fCreateTimeMs;
56                 fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
57                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
58                 offset = 0;
59
60                 state = KafkaConsumer.State.OPENED;
61
62                 // final Map<String, Integer> topicCountMap = new HashMap<String,
63                 // Integer>();
64                 // topicCountMap.put(fTopic, 1);
65                 // log.info(fLogTag +" kafka Consumer started at "
66                 // +System.currentTimeMillis());
67                 // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
68                 // fConnector.createMessageStreams(topicCountMap);
69                 // final List<KafkaStream<byte[], byte[]>> streams =
70                 // consumerMap.get(fTopic);
71
72                 kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
73                 // System.out.println("I am in Consumer APP " + topic + "-- " +
74                 // fConsumer);
75                 kConsumer.subscribe(Arrays.asList(topic));
76                 log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
77         System.out.println("-----id " +id);
78         
79                 
80           try { ConsumerRecords<String, String> records =
81                                                    kConsumer.poll(500); System.out.println("---" +
82                                                     records.count());
83                                           
84                                                     for (ConsumerRecord<String, String> record : records) {
85                                                     System.out.printf("offset = %d, key = %s, value = %s",
86                                                     record.offset(), record.key(), record.value()); String t =
87                                                     record.value();
88                                           
89                                                     }
90                   }catch(Exception e){
91                           System.out.println( e);
92                   }
93                         System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
94                         kConsumer.commitSync();
95                 //  fConsumer.close();  
96         
97
98                 /*
99                  * ConsumerRecords<String, String> records = fConsumer.poll(500);
100                  * System.out.println("---" + records.count());
101                  * 
102                  * for (ConsumerRecord<String, String> record : records) {
103                  * System.out.printf("offset = %d, key = %s, value = %s",
104                  * record.offset(), record.key(), record.value()); String t =
105                  * record.value();
106                  * 
107                  * }
108                  * 
109                  * 
110                  * fConsumer.commitSync(); fConsumer.close();
111                  */
112
113                 // fStream = streams.iterator().next();
114         }
115         
116         
117         
118         private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
119         {
120                 return new Consumer.Message()
121                 {
122                         @Override
123                         public long getOffset ()
124                         {
125                                 return msg.offset ();
126                         }
127                         
128                         @Override
129                         public String getMessage ()
130                         {
131                                 return new String ( msg.value () );
132                         }
133                 };
134         }
135         
136         @Override
137         public synchronized Consumer.Message nextMessage ()
138         {
139                 
140                 try
141                 {
142                         if ( fPendingMsgs.size () > 0 )
143                         {
144                                 return makeMessage ( fPendingMsgs.take () );
145                         }
146                 }
147                 catch ( InterruptedException x )
148                 {
149                         log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
150                 }
151                 
152                 
153                         try
154                         {
155                                 boolean foundMsgs = false;
156                                 System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
157                                 final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
158                                 System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
159                                 for ( ConsumerRecord<String,String> record : records )
160                                 {
161                                         foundMsgs = true;
162                                         fPendingMsgs.offer ( record );
163                                 }
164                         
165                         }
166                         catch ( KafkaException x )
167                         {
168                                 log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
169                                 
170                         }
171                         catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
172                         {
173                                 log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
174                         
175                         }
176                                 
177                 return null;
178         }
179         
180         
181
182         /**
183          * getName() method returns string type value. returns 3 parameters in
184          * string:- fTopic,fGroup,fId
185          * 
186          * @Override
187          */
188         public String getName() {
189                 return fTopic + " : " + fGroup + " : " + fId;
190         }
191
192         /**
193          * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
194          * variable value
195          * 
196          * @Override
197          * 
198          */
199         public long getCreateTimeMs() {
200                 return fCreateTimeMs;
201         }
202
203         public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
204                 return kConsumer;
205         }
206
207         /**
208          * getLastAccessMs() method returns long type value. returns fLastTouch
209          * variable value
210          * 
211          * @Override
212          * 
213          */
214         public long getLastAccessMs() {
215                 return fLastTouch;
216         }
217
218         
219
220         /**
221          * getOffset() method returns long type value. returns offset variable value
222          * 
223          * @Override
224          * 
225          */
226         public long getOffset() {
227                 return offset;
228         }
229
230         /**
231          * commit offsets commitOffsets() method will be called on closed of
232          * KafkaConsumer.
233          * 
234          * @Override
235          * 
236          *
237          *                      public void commitOffsets() { if (getState() ==
238          *           KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
239          *           on closed KafkaConsumer " + getName()); return; }
240          *           fConnector.commitOffsets(); }
241          */
242
243         /**
244          * updating fLastTouch with current time in ms
245          */
246         public void touch() {
247                 fLastTouch = System.currentTimeMillis();
248         }
249
250         /**
251          * getLastTouch() method returns long type value. returns fLastTouch
252          * variable value
253          * 
254          */
255         public long getLastTouch() {
256                 return fLastTouch;
257         }
258
259         /**
260          * setting the kafkaConsumer state to closed
261          */
262         public synchronized boolean close() {
263
264                 if (getState() == KafkaConsumer.State.CLOSED) {
265
266                         log.warn("close() called on closed KafkaConsumer " + getName());
267                         return true;
268                 }
269
270                 setState(KafkaConsumer.State.CLOSED);
271                 // fConnector.shutdown();
272                 boolean retVal = kafkaConnectorshuttask();
273                 return retVal;
274
275         }
276
277         /* time out if the kafka shutdown fails for some reason */
278
279         private boolean kafkaConnectorshuttask() {
280                 Callable<Boolean> run = new Callable<Boolean>() {
281                         @Override
282                         public Boolean call() throws Exception {
283                                 // your code to be timed
284                                 try {
285                                 System.out.println("consumer closing....." + kConsumer);
286                                         kConsumer.close();
287                                 } catch (Exception e) {
288                                         log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
289                                 }
290                                 log.info("Kafka connection closure with in 15 seconds by a Executors task");
291                                 return true;
292                         }
293                 };
294
295                 RunnableFuture future = new FutureTask(run);
296                 ExecutorService service = Executors.newSingleThreadExecutor();
297                 service.execute(future);
298                 Boolean result = null;
299                 try {
300                         result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
301                                                                                                                                         // second
302                 } catch (TimeoutException ex) {
303                         // timed out. Try to stop the code if possible.
304                         log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
305                         future.cancel(true);
306                 } catch (Exception ex) {
307                         // timed out. Try to stop the code if possible.
308                         log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
309                         future.cancel(true);
310                         return false;
311                 }
312                 service.shutdown();
313                 return true;
314         }
315
316         /**
317          * getConsumerGroup() returns Consumer group
318          * 
319          * @return
320          */
321         public String getConsumerGroup() {
322                 return fGroup;
323         }
324
325         /**
326          * getConsumerId returns Consumer Id
327          * 
328          * @return
329          */
330         public String getConsumerId() {
331                 return fId;
332         }
333
334         /**
335          * getState returns kafkaconsumer state
336          * 
337          * @return
338          */
339         private KafkaConsumer.State getState() {
340                 return this.state;
341         }
342
343         /**
344          * setState() sets the kafkaConsumer state
345          * 
346          * @param state
347          */
348         private void setState(KafkaConsumer.State state) {
349                 this.state = state;
350         }
351
352         // private ConsumerConnector fConnector;
353         private final String fTopic;
354         private final String fGroup;
355         private final String fId;
356         private final String fLogTag;
357         // private final KafkaStream<byte[], byte[]> fStream;
358         private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
359         private long fCreateTimeMs;
360         private long fLastTouch;
361         private long offset;
362         private KafkaConsumer.State state;
363         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
364         private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
365         // private static final Logger log =
366         // LoggerFactory.getLogger(KafkaConsumer.class);
367
368         @Override
369         public void commitOffsets() {
370                 if (getState() == KafkaConsumer.State.CLOSED) {
371                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
372                         return;
373                 }
374                 kConsumer.commitSync();
375                 // fConsumer.close();
376
377         }
378
379
380
381         @Override
382         public void setOffset(long offsetval) {
383                 // TODO Auto-generated method stub
384                 offset = offsetval;
385         }
386 }