1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.kafka.clients.consumer.ConsumerRecord;
39 import org.apache.kafka.clients.consumer.ConsumerRecords;
40 import org.apache.kafka.clients.consumer.KafkaConsumer;
41 import org.apache.kafka.common.KafkaException;
43 import org.onap.dmaap.dmf.mr.backends.Consumer;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
47 import com.att.eelf.configuration.EELFLogger;
48 import com.att.eelf.configuration.EELFManager;
51 * A consumer instance that's created per-request. These are stateless so that
52 * clients can connect to this service as a proxy.
57 public class Kafka011Consumer implements Consumer {
64 * KafkaConsumer() is constructor. It has following 4 parameters:-
73 public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
74 KafkaLiveLockAvoider2 klla) throws Exception {
78 fCreateTimeMs = System.currentTimeMillis();
79 fLastTouch = fCreateTimeMs;
80 fPendingMsgs = new LinkedBlockingQueue<>();
81 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
83 state = Kafka011Consumer.State.OPENED;
85 fKafkaLiveLockAvoider = klla;
87 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
89 if (StringUtils.isNotEmpty(consumerTimeOut)) {
90 consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
92 synchronized (kConsumer) {
93 kConsumer.subscribe(Arrays.asList(topic));
97 private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
98 return new Consumer.Message() {
100 public long getOffset() {
101 offset = msg.offset();
106 public String getMessage() {
107 return new String(msg.value());
113 public synchronized Consumer.Message nextMessage() {
116 if (fPendingMsgs.isEmpty()) {
117 return makeMessage(fPendingMsgs.take());
119 } catch (InterruptedException x) {
120 log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
124 Callable<Boolean> run = new Callable<Boolean>() {
126 public Boolean call() throws Exception {
128 ConsumerRecords<String, String> records;
129 synchronized (kConsumer) {
130 records = kConsumer.poll(500);
132 for (ConsumerRecord<String, String> record : records) {
134 fPendingMsgs.offer(record);
137 } catch (KafkaException x) {
138 log.debug(fLogTag + ": KafkaException " + x.getMessage());
140 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
141 log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
151 @SuppressWarnings({ "rawtypes", "unchecked" })
152 RunnableFuture future = new FutureTask(run);
153 ExecutorService service = Executors.newSingleThreadExecutor();
154 service.execute(future);
156 future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
158 } catch (TimeoutException ex) {
159 // timed out. Try to stop the code if possible.
160 String apiNodeId = null;
162 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
163 } catch (UnknownHostException e1) {
164 // TODO Auto-generated catch block
165 log.error("unable to get the localhost address");
169 if (fKafkaLiveLockAvoider != null)
170 fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
171 } catch (Exception e) {
172 log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
175 forcePollOnConsumer();
177 } catch (Exception ex) {
178 // timed out. Try to stop the code if possible.
188 * getName() method returns string type value. returns 3 parameters in
189 * string:- fTopic,fGroup,fId
193 public String getName() {
194 return fTopic + " : " + fGroup + " : " + fId;
198 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
204 public long getCreateTimeMs() {
205 return fCreateTimeMs;
208 public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
213 * getLastAccessMs() method returns long type value. returns fLastTouch
219 public long getLastAccessMs() {
224 * getOffset() method returns long type value. returns offset variable value
229 public long getOffset() {
234 * commit offsets commitOffsets() method will be called on closed of
240 * public void commitOffsets() { if (getState() ==
241 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
242 * on closed KafkaConsumer " + getName()); return; }
243 * fConnector.commitOffsets(); }
247 * updating fLastTouch with current time in ms
249 public void touch() {
250 fLastTouch = System.currentTimeMillis();
254 * getLastTouch() method returns long type value. returns fLastTouch
258 public long getLastTouch() {
263 * setting the kafkaConsumer state to closed
266 public boolean close() {
267 if (getState() == Kafka011Consumer.State.CLOSED) {
269 log.error("close() called on closed KafkaConsumer " + getName());
274 boolean retVal = kafkaConnectorshuttask();
279 /* time out if the kafka shutdown fails for some reason */
281 private boolean kafkaConnectorshuttask() {
282 Callable<Boolean> run = new Callable<Boolean>() {
284 public Boolean call() throws Exception {
290 } catch (Exception e) {
291 log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
292 throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
295 log.info("Kafka connection closure with in 15 seconds by a Executors task");
301 @SuppressWarnings({ "rawtypes", "unchecked" })
302 RunnableFuture future = new FutureTask(run);
303 ExecutorService service = Executors.newSingleThreadExecutor();
304 service.execute(future);
306 future.get(200, TimeUnit.SECONDS); // wait 1
308 } catch (TimeoutException ex) {
309 // timed out. Try to stop the code if possible.
310 log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
312 setState(Kafka011Consumer.State.OPENED);
313 } catch (Exception ex) {
314 // timed out. Try to stop the code if possible.
315 log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
318 setState(Kafka011Consumer.State.OPENED);
322 setState(Kafka011Consumer.State.CLOSED);
326 public void forcePollOnConsumer() {
327 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
332 * getConsumerGroup() returns Consumer group
336 public String getConsumerGroup() {
341 * getConsumerId returns Consumer Id
345 public String getConsumerId() {
350 * getState returns kafkaconsumer state
354 private Kafka011Consumer.State getState() {
359 * setState() sets the kafkaConsumer state
363 private void setState(Kafka011Consumer.State state) {
368 private final String fTopic;
369 private final String fGroup;
370 private final String fId;
371 private final String fLogTag;
373 private KafkaConsumer<String, String> kConsumer;
374 private long fCreateTimeMs;
375 private long fLastTouch;
377 private Kafka011Consumer.State state;
378 private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
379 private int consumerPollTimeOut=5;
380 private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
381 private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
384 public void commitOffsets() {
385 if (getState() == Kafka011Consumer.State.CLOSED) {
386 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
389 kConsumer.commitSync();
395 public void setOffset(long offsetval) {
400 public void setConsumerCache(KafkaConsumerCache cache) {