1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.kafka.clients.consumer.ConsumerRecord;
39 import org.apache.kafka.clients.consumer.ConsumerRecords;
40 import org.apache.kafka.clients.consumer.KafkaConsumer;
41 import org.apache.kafka.common.KafkaException;
43 import org.onap.dmaap.dmf.mr.backends.Consumer;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
47 import com.att.eelf.configuration.EELFLogger;
48 import com.att.eelf.configuration.EELFManager;
51 * A consumer instance that's created per-request. These are stateless so that
52 * clients can connect to this service as a proxy.
57 public class Kafka011Consumer implements Consumer {
64 * KafkaConsumer() is constructor. It has following 4 parameters:-
73 public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
74 KafkaLiveLockAvoider2 klla) throws Exception {
78 fCreateTimeMs = System.currentTimeMillis();
79 fLastTouch = fCreateTimeMs;
80 fPendingMsgs = new LinkedBlockingQueue<>();
81 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
83 state = Kafka011Consumer.State.OPENED;
85 fKafkaLiveLockAvoider = klla;
87 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
89 if (StringUtils.isNotEmpty(consumerTimeOut)) {
90 consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
92 synchronized (kConsumer) {
93 kConsumer.subscribe(Arrays.asList(topic));
97 private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
98 return new Consumer.Message() {
100 public long getOffset() {
101 offset = msg.offset();
106 public String getMessage() {
107 return new String(msg.value());
113 public synchronized Consumer.Message nextMessage() {
116 if (fPendingMsgs.isEmpty()) {
117 return makeMessage(fPendingMsgs.take());
119 } catch (InterruptedException x) {
120 log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
122 Thread.currentThread().interrupt();
125 Callable<Boolean> run = new Callable<Boolean>() {
127 public Boolean call() throws Exception {
129 ConsumerRecords<String, String> records;
130 synchronized (kConsumer) {
131 records = kConsumer.poll(500);
133 for (ConsumerRecord<String, String> record : records) {
135 fPendingMsgs.offer(record);
138 } catch (KafkaException x) {
139 log.debug(fLogTag + ": KafkaException ", x);
141 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
142 log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
151 @SuppressWarnings({ "rawtypes", "unchecked" })
152 RunnableFuture future = new FutureTask(run);
153 ExecutorService service = Executors.newSingleThreadExecutor();
154 service.execute(future);
156 future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
158 } catch (TimeoutException ex) {
159 log.error("TimeoutException in in Kafka consumer ", ex);
160 // timed out. Try to stop the code if possible.
161 String apiNodeId = null;
163 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
164 } catch (UnknownHostException e1) {
165 log.error("unable to get the localhost address ", e1);
169 if (fKafkaLiveLockAvoider != null)
170 fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
171 } catch (Exception e) {
172 log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
175 forcePollOnConsumer();
177 } catch (Exception ex) {
178 log.error("Exception in in Kafka consumer ", ex);
179 // timed out. Try to stop the code if possible.
189 * getName() method returns string type value. returns 3 parameters in
190 * string:- fTopic,fGroup,fId
194 public String getName() {
195 return fTopic + " : " + fGroup + " : " + fId;
199 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
205 public long getCreateTimeMs() {
206 return fCreateTimeMs;
209 public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
214 * getLastAccessMs() method returns long type value. returns fLastTouch
220 public long getLastAccessMs() {
225 * getOffset() method returns long type value. returns offset variable value
230 public long getOffset() {
235 * commit offsets commitOffsets() method will be called on closed of
241 * public void commitOffsets() { if (getState() ==
242 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
243 * on closed KafkaConsumer " + getName()); return; }
244 * fConnector.commitOffsets(); }
248 * updating fLastTouch with current time in ms
250 public void touch() {
251 fLastTouch = System.currentTimeMillis();
255 * getLastTouch() method returns long type value. returns fLastTouch
259 public long getLastTouch() {
264 * setting the kafkaConsumer state to closed
267 public boolean close() {
268 if (getState() == Kafka011Consumer.State.CLOSED) {
270 log.error("close() called on closed KafkaConsumer " + getName());
275 boolean retVal = kafkaConnectorshuttask();
280 /* time out if the kafka shutdown fails for some reason */
282 private boolean kafkaConnectorshuttask() {
283 Callable<Boolean> run = new Callable<Boolean>() {
285 public Boolean call() throws Exception {
291 } catch (Exception e) {
292 log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
293 throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
296 log.info("Kafka connection closure with in 15 seconds by a Executors task");
302 @SuppressWarnings({ "rawtypes", "unchecked" })
303 RunnableFuture future = new FutureTask(run);
304 ExecutorService service = Executors.newSingleThreadExecutor();
305 service.execute(future);
307 future.get(200, TimeUnit.SECONDS); // wait 1
309 } catch (TimeoutException ex) {
310 // timed out. Try to stop the code if possible.
311 log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
313 setState(Kafka011Consumer.State.OPENED);
314 } catch (Exception ex) {
315 // timed out. Try to stop the code if possible.
316 log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
318 setState(Kafka011Consumer.State.OPENED);
322 setState(Kafka011Consumer.State.CLOSED);
326 public void forcePollOnConsumer() {
327 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
332 * getConsumerGroup() returns Consumer group
336 public String getConsumerGroup() {
341 * getConsumerId returns Consumer Id
345 public String getConsumerId() {
350 * getState returns kafkaconsumer state
354 private Kafka011Consumer.State getState() {
359 * setState() sets the kafkaConsumer state
363 private void setState(Kafka011Consumer.State state) {
368 private final String fTopic;
369 private final String fGroup;
370 private final String fId;
371 private final String fLogTag;
373 private KafkaConsumer<String, String> kConsumer;
374 private long fCreateTimeMs;
375 private long fLastTouch;
377 private Kafka011Consumer.State state;
378 private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
379 private int consumerPollTimeOut=5;
380 private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
381 private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
384 public void commitOffsets() {
385 if (getState() == Kafka011Consumer.State.CLOSED) {
386 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
389 kConsumer.commitSync();
395 public void setOffset(long offsetval) {
400 public void setConsumerCache(KafkaConsumerCache cache) {