1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.kafka.clients.consumer.ConsumerRecord;
29 import org.apache.kafka.clients.consumer.ConsumerRecords;
30 import org.apache.kafka.clients.consumer.KafkaConsumer;
31 import org.apache.kafka.common.KafkaException;
32 import org.onap.dmaap.dmf.mr.backends.Consumer;
33 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.util.Arrays;
38 import java.util.concurrent.*;
41 * A consumer instance that's created per-request. These are stateless so that
42 * clients can connect to this service as a proxy.
47 public class Kafka011Consumer implements Consumer {
54 * KafkaConsumer() is constructor. It has following 4 parameters:-
63 public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
64 KafkaLiveLockAvoider2 klla) throws Exception {
68 fCreateTimeMs = System.currentTimeMillis();
69 fLastTouch = fCreateTimeMs;
70 fPendingMsgs = new LinkedBlockingQueue<>();
71 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
75 fKafkaLiveLockAvoider = klla;
77 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
79 if (StringUtils.isNotEmpty(consumerTimeOut)) {
80 consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
82 synchronized (kConsumer) {
83 kConsumer.subscribe(Arrays.asList(topic));
87 private Message makeMessage(final ConsumerRecord<String, String> msg) {
88 return new Message() {
90 public long getOffset() {
91 offset = msg.offset();
96 public String getMessage() {
97 return new String(msg.value());
103 public synchronized Message nextMessage() {
106 if (!fPendingMsgs.isEmpty()) {
107 return makeMessage(fPendingMsgs.take());
109 } catch (InterruptedException x) {
110 log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
112 Thread.currentThread().interrupt();
115 Callable<Boolean> run = new Callable<Boolean>() {
117 public Boolean call() throws Exception {
119 ConsumerRecords<String, String> records;
120 synchronized (kConsumer) {
121 records = kConsumer.poll(500);
123 for (ConsumerRecord<String, String> record : records) {
125 fPendingMsgs.offer(record);
128 } catch (KafkaException x) {
129 log.debug(fLogTag + ": KafkaException ", x);
131 } catch (IllegalStateException | IllegalArgumentException x) {
132 log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
141 @SuppressWarnings({ "rawtypes", "unchecked" })
142 RunnableFuture future = new FutureTask(run);
143 ExecutorService service = Executors.newSingleThreadExecutor();
144 service.execute(future);
146 future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
148 } catch (TimeoutException ex) {
149 log.error("TimeoutException in in Kafka consumer ", ex);
150 // timed out. Try to stop the code if possible.
151 String apiNodeId = null;
153 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
154 } catch (UnknownHostException e1) {
155 log.error("unable to get the localhost address ", e1);
159 if (fKafkaLiveLockAvoider != null)
160 fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
161 } catch (Exception e) {
162 log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
165 forcePollOnConsumer();
167 } catch (Exception ex) {
168 log.error("Exception in in Kafka consumer ", ex);
169 // timed out. Try to stop the code if possible.
179 * getName() method returns string type value. returns 3 parameters in
180 * string:- fTopic,fGroup,fId
184 public String getName() {
185 return fTopic + " : " + fGroup + " : " + fId;
189 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
195 public long getCreateTimeMs() {
196 return fCreateTimeMs;
199 public KafkaConsumer<String, String> getConsumer() {
204 * getLastAccessMs() method returns long type value. returns fLastTouch
210 public long getLastAccessMs() {
215 * getOffset() method returns long type value. returns offset variable value
220 public long getOffset() {
225 * commit offsets commitOffsets() method will be called on closed of
231 * public void commitOffsets() { if (getState() ==
232 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
233 * on closed KafkaConsumer " + getName()); return; }
234 * fConnector.commitOffsets(); }
238 * updating fLastTouch with current time in ms
240 public void touch() {
241 fLastTouch = System.currentTimeMillis();
245 * getLastTouch() method returns long type value. returns fLastTouch
249 public long getLastTouch() {
254 * setting the kafkaConsumer state to closed
257 public boolean close() {
258 if (getState() == State.CLOSED) {
260 log.error("close() called on closed KafkaConsumer " + getName());
265 boolean retVal = kafkaConnectorshuttask();
270 /* time out if the kafka shutdown fails for some reason */
272 private boolean kafkaConnectorshuttask() {
273 Callable<Boolean> run = new Callable<Boolean>() {
275 public Boolean call() throws Exception {
281 } catch (Exception e) {
282 log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
283 throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
286 log.info("Kafka connection closure with in 15 seconds by a Executors task");
292 @SuppressWarnings({ "rawtypes", "unchecked" })
293 RunnableFuture future = new FutureTask(run);
294 ExecutorService service = Executors.newSingleThreadExecutor();
295 service.execute(future);
297 future.get(200, TimeUnit.SECONDS); // wait 1
299 } catch (TimeoutException ex) {
300 // timed out. Try to stop the code if possible.
301 log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
303 setState(State.OPENED);
304 } catch (Exception ex) {
305 // timed out. Try to stop the code if possible.
306 log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
308 setState(State.OPENED);
312 setState(State.CLOSED);
316 public void forcePollOnConsumer() {
317 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
322 * getConsumerGroup() returns Consumer group
326 public String getConsumerGroup() {
331 * getConsumerId returns Consumer Id
335 public String getConsumerId() {
340 * getState returns kafkaconsumer state
344 private State getState() {
349 * setState() sets the kafkaConsumer state
353 private void setState(State state) {
358 private final String fTopic;
359 private final String fGroup;
360 private final String fId;
361 private final String fLogTag;
363 private KafkaConsumer<String, String> kConsumer;
364 private long fCreateTimeMs;
365 private long fLastTouch;
368 private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
369 private int consumerPollTimeOut=5;
370 private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
371 private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
374 public void commitOffsets() {
375 if (getState() == State.CLOSED) {
376 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
379 kConsumer.commitSync();
385 public void setOffset(long offsetval) {
390 public void setConsumerCache(KafkaConsumerCache cache) {