1 package com.att.dmf.mr.backends.kafka;
3 import java.util.Arrays;
4 import java.util.Properties;
5 import java.util.concurrent.Callable;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8 import java.util.concurrent.FutureTask;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.RunnableFuture;
11 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.TimeoutException;
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.common.KafkaException;
18 import com.att.dmf.mr.backends.Consumer;
20 //import org.slf4j.Logger;
21 //import org.slf4j.LoggerFactory;
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
27 * A consumer instance that's created per-request. These are stateless so that
28 * clients can connect to this service as a proxy.
33 public class KafkaConsumer implements Consumer {
39 * KafkaConsumer() is constructor. It has following 4 parameters:-
48 public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception {
54 fCreateTimeMs = System.currentTimeMillis();
55 fLastTouch = fCreateTimeMs;
56 fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String,String>> ();
57 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
60 state = KafkaConsumer.State.OPENED;
62 // final Map<String, Integer> topicCountMap = new HashMap<String,
64 // topicCountMap.put(fTopic, 1);
65 // log.info(fLogTag +" kafka Consumer started at "
66 // +System.currentTimeMillis());
67 // final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
68 // fConnector.createMessageStreams(topicCountMap);
69 // final List<KafkaStream<byte[], byte[]>> streams =
70 // consumerMap.get(fTopic);
72 kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
73 // System.out.println("I am in Consumer APP " + topic + "-- " +
75 kConsumer.subscribe(Arrays.asList(topic));
76 log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
77 System.out.println("-----id " +id);
80 try { ConsumerRecords<String, String> records =
81 kConsumer.poll(500); System.out.println("---" +
84 for (ConsumerRecord<String, String> record : records) {
85 System.out.printf("offset = %d, key = %s, value = %s",
86 record.offset(), record.key(), record.value()); String t =
91 System.out.println( e);
93 System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
94 kConsumer.commitSync();
99 * ConsumerRecords<String, String> records = fConsumer.poll(500);
100 * System.out.println("---" + records.count());
102 * for (ConsumerRecord<String, String> record : records) {
103 * System.out.printf("offset = %d, key = %s, value = %s",
104 * record.offset(), record.key(), record.value()); String t =
110 * fConsumer.commitSync(); fConsumer.close();
113 // fStream = streams.iterator().next();
118 private Consumer.Message makeMessage ( final ConsumerRecord<String,String> msg )
120 return new Consumer.Message()
123 public long getOffset ()
125 return msg.offset ();
129 public String getMessage ()
131 return new String ( msg.value () );
137 public synchronized Consumer.Message nextMessage ()
142 if ( fPendingMsgs.size () > 0 )
144 return makeMessage ( fPendingMsgs.take () );
147 catch ( InterruptedException x )
149 log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x );
155 boolean foundMsgs = false;
156 System.out.println("entering into pollingWWWWWWWWWWWWWWWWW");
157 final ConsumerRecords<String,String> records = kConsumer.poll ( 100 );
158 System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX....");
159 for ( ConsumerRecord<String,String> record : records )
162 fPendingMsgs.offer ( record );
166 catch ( KafkaException x )
168 log.debug ( fLogTag + ": KafkaException " + x.getMessage () );
171 catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x )
173 log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () );
183 * getName() method returns string type value. returns 3 parameters in
184 * string:- fTopic,fGroup,fId
188 public String getName() {
189 return fTopic + " : " + fGroup + " : " + fId;
193 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
199 public long getCreateTimeMs() {
200 return fCreateTimeMs;
203 public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() {
208 * getLastAccessMs() method returns long type value. returns fLastTouch
214 public long getLastAccessMs() {
221 * getOffset() method returns long type value. returns offset variable value
226 public long getOffset() {
231 * commit offsets commitOffsets() method will be called on closed of
237 * public void commitOffsets() { if (getState() ==
238 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
239 * on closed KafkaConsumer " + getName()); return; }
240 * fConnector.commitOffsets(); }
244 * updating fLastTouch with current time in ms
246 public void touch() {
247 fLastTouch = System.currentTimeMillis();
251 * getLastTouch() method returns long type value. returns fLastTouch
255 public long getLastTouch() {
260 * setting the kafkaConsumer state to closed
262 public synchronized boolean close() {
264 if (getState() == KafkaConsumer.State.CLOSED) {
266 log.warn("close() called on closed KafkaConsumer " + getName());
270 setState(KafkaConsumer.State.CLOSED);
271 // fConnector.shutdown();
272 boolean retVal = kafkaConnectorshuttask();
277 /* time out if the kafka shutdown fails for some reason */
279 private boolean kafkaConnectorshuttask() {
280 Callable<Boolean> run = new Callable<Boolean>() {
282 public Boolean call() throws Exception {
283 // your code to be timed
285 System.out.println("consumer closing....." + kConsumer);
287 } catch (Exception e) {
288 log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
290 log.info("Kafka connection closure with in 15 seconds by a Executors task");
295 RunnableFuture future = new FutureTask(run);
296 ExecutorService service = Executors.newSingleThreadExecutor();
297 service.execute(future);
298 Boolean result = null;
300 result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1
302 } catch (TimeoutException ex) {
303 // timed out. Try to stop the code if possible.
304 log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task");
306 } catch (Exception ex) {
307 // timed out. Try to stop the code if possible.
308 log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex);
317 * getConsumerGroup() returns Consumer group
321 public String getConsumerGroup() {
326 * getConsumerId returns Consumer Id
330 public String getConsumerId() {
335 * getState returns kafkaconsumer state
339 private KafkaConsumer.State getState() {
344 * setState() sets the kafkaConsumer state
348 private void setState(KafkaConsumer.State state) {
352 // private ConsumerConnector fConnector;
353 private final String fTopic;
354 private final String fGroup;
355 private final String fId;
356 private final String fLogTag;
357 // private final KafkaStream<byte[], byte[]> fStream;
358 private final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> kConsumer;
359 private long fCreateTimeMs;
360 private long fLastTouch;
362 private KafkaConsumer.State state;
363 private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
364 private final LinkedBlockingQueue<ConsumerRecord<String,String>> fPendingMsgs;
365 // private static final Logger log =
366 // LoggerFactory.getLogger(KafkaConsumer.class);
369 public void commitOffsets() {
370 if (getState() == KafkaConsumer.State.CLOSED) {
371 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
374 kConsumer.commitSync();
375 // fConsumer.close();
382 public void setOffset(long offsetval) {
383 // TODO Auto-generated method stub