1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Arrays;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.FutureTask;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.RunnableFuture;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
36 import org.apache.commons.lang.StringUtils;
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
42 import org.onap.dmaap.dmf.mr.backends.Consumer;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
46 import com.att.eelf.configuration.EELFLogger;
47 import com.att.eelf.configuration.EELFManager;
50 * A consumer instance that's created per-request. These are stateless so that
51 * clients can connect to this service as a proxy.
56 public class Kafka011Consumer implements Consumer {
63 * KafkaConsumer() is constructor. It has following 4 parameters:-
72 public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
73 KafkaLiveLockAvoider2 klla) throws Exception {
77 fCreateTimeMs = System.currentTimeMillis();
78 fLastTouch = fCreateTimeMs;
79 fPendingMsgs = new LinkedBlockingQueue<>();
80 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
82 state = Kafka011Consumer.State.OPENED;
84 fKafkaLiveLockAvoider = klla;
86 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
88 if (StringUtils.isNotEmpty(consumerTimeOut)) {
89 consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
91 synchronized (kConsumer) {
92 kConsumer.subscribe(Arrays.asList(topic));
96 private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
97 return new Consumer.Message() {
99 public long getOffset() {
100 offset = msg.offset();
105 public String getMessage() {
106 return new String(msg.value());
112 public synchronized Consumer.Message nextMessage() {
115 if (!fPendingMsgs.isEmpty()) {
116 return makeMessage(fPendingMsgs.take());
118 } catch (InterruptedException x) {
119 log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
121 Thread.currentThread().interrupt();
124 Callable<Boolean> run = new Callable<Boolean>() {
126 public Boolean call() throws Exception {
128 ConsumerRecords<String, String> records;
129 synchronized (kConsumer) {
130 records = kConsumer.poll(500);
132 for (ConsumerRecord<String, String> record : records) {
134 fPendingMsgs.offer(record);
137 } catch (KafkaException x) {
138 log.debug(fLogTag + ": KafkaException ", x);
140 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
141 log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
150 @SuppressWarnings({ "rawtypes", "unchecked" })
151 RunnableFuture future = new FutureTask(run);
152 ExecutorService service = Executors.newSingleThreadExecutor();
153 service.execute(future);
155 future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
157 } catch (TimeoutException ex) {
158 log.error("TimeoutException in in Kafka consumer ", ex);
159 // timed out. Try to stop the code if possible.
160 String apiNodeId = null;
162 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
163 } catch (UnknownHostException e1) {
164 log.error("unable to get the localhost address ", e1);
168 if (fKafkaLiveLockAvoider != null)
169 fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
170 } catch (Exception e) {
171 log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
174 forcePollOnConsumer();
176 } catch (Exception ex) {
177 log.error("Exception in in Kafka consumer ", ex);
178 // timed out. Try to stop the code if possible.
188 * getName() method returns string type value. returns 3 parameters in
189 * string:- fTopic,fGroup,fId
193 public String getName() {
194 return fTopic + " : " + fGroup + " : " + fId;
198 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
204 public long getCreateTimeMs() {
205 return fCreateTimeMs;
208 public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
213 * getLastAccessMs() method returns long type value. returns fLastTouch
219 public long getLastAccessMs() {
224 * getOffset() method returns long type value. returns offset variable value
229 public long getOffset() {
234 * commit offsets commitOffsets() method will be called on closed of
240 * public void commitOffsets() { if (getState() ==
241 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
242 * on closed KafkaConsumer " + getName()); return; }
243 * fConnector.commitOffsets(); }
247 * updating fLastTouch with current time in ms
249 public void touch() {
250 fLastTouch = System.currentTimeMillis();
254 * getLastTouch() method returns long type value. returns fLastTouch
258 public long getLastTouch() {
263 * setting the kafkaConsumer state to closed
266 public boolean close() {
267 if (getState() == Kafka011Consumer.State.CLOSED) {
269 log.error("close() called on closed KafkaConsumer " + getName());
274 boolean retVal = kafkaConnectorshuttask();
279 /* time out if the kafka shutdown fails for some reason */
281 private boolean kafkaConnectorshuttask() {
282 Callable<Boolean> run = new Callable<Boolean>() {
284 public Boolean call() throws Exception {
290 } catch (Exception e) {
291 log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
292 throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
295 log.info("Kafka connection closure with in 15 seconds by a Executors task");
301 @SuppressWarnings({ "rawtypes", "unchecked" })
302 RunnableFuture future = new FutureTask(run);
303 ExecutorService service = Executors.newSingleThreadExecutor();
304 service.execute(future);
306 future.get(200, TimeUnit.SECONDS); // wait 1
308 } catch (TimeoutException ex) {
309 // timed out. Try to stop the code if possible.
310 log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
312 setState(Kafka011Consumer.State.OPENED);
313 } catch (Exception ex) {
314 // timed out. Try to stop the code if possible.
315 log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
317 setState(Kafka011Consumer.State.OPENED);
321 setState(Kafka011Consumer.State.CLOSED);
325 public void forcePollOnConsumer() {
326 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
331 * getConsumerGroup() returns Consumer group
335 public String getConsumerGroup() {
340 * getConsumerId returns Consumer Id
344 public String getConsumerId() {
349 * getState returns kafkaconsumer state
353 private Kafka011Consumer.State getState() {
358 * setState() sets the kafkaConsumer state
362 private void setState(Kafka011Consumer.State state) {
367 private final String fTopic;
368 private final String fGroup;
369 private final String fId;
370 private final String fLogTag;
372 private KafkaConsumer<String, String> kConsumer;
373 private long fCreateTimeMs;
374 private long fLastTouch;
376 private Kafka011Consumer.State state;
377 private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
378 private int consumerPollTimeOut=5;
379 private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
380 private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
383 public void commitOffsets() {
384 if (getState() == Kafka011Consumer.State.CLOSED) {
385 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
388 kConsumer.commitSync();
394 public void setOffset(long offsetval) {
399 public void setConsumerCache(KafkaConsumerCache cache) {