1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
42 import org.onap.dmaap.dmf.mr.backends.Consumer;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
47 import com.att.eelf.configuration.EELFLogger;
48 import com.att.eelf.configuration.EELFManager;
51 * A consumer instance that's created per-request. These are stateless so that
52 * clients can connect to this service as a proxy.
57 public class Kafka011Consumer implements Consumer {
64 * KafkaConsumer() is constructor. It has following 4 parameters:-
73 public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
74 KafkaLiveLockAvoider2 klla) throws Exception {
78 fCreateTimeMs = System.currentTimeMillis();
79 fLastTouch = fCreateTimeMs;
80 fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
81 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
83 state = Kafka011Consumer.State.OPENED;
85 fKafkaLiveLockAvoider = klla;
86 synchronized (kConsumer) {
87 kConsumer.subscribe(Arrays.asList(topic));
91 private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
92 return new Consumer.Message() {
94 public long getOffset() {
95 offset = msg.offset();
100 public String getMessage() {
101 return new String(msg.value());
107 public synchronized Consumer.Message nextMessage() {
110 if (fPendingMsgs.size() > 0) {
111 return makeMessage(fPendingMsgs.take());
113 } catch (InterruptedException x) {
114 log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
118 Callable<Boolean> run = new Callable<Boolean>() {
120 public Boolean call() throws Exception {
122 ConsumerRecords<String, String> records;
123 synchronized (kConsumer) {
124 records = kConsumer.poll(500);
126 for (ConsumerRecord<String, String> record : records) {
128 fPendingMsgs.offer(record);
131 } catch (KafkaException x) {
132 log.debug(fLogTag + ": KafkaException " + x.getMessage());
134 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
135 log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
145 @SuppressWarnings({ "rawtypes", "unchecked" })
146 RunnableFuture future = new FutureTask(run);
147 ExecutorService service = Executors.newSingleThreadExecutor();
148 service.execute(future);
150 future.get(5, TimeUnit.SECONDS); // wait 1
152 } catch (TimeoutException ex) {
153 // timed out. Try to stop the code if possible.
154 String apiNodeId = null;
156 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
157 } catch (UnknownHostException e1) {
158 // TODO Auto-generated catch block
159 log.error("unable to get the localhost address");
163 if (fKafkaLiveLockAvoider != null)
164 fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
165 } catch (Exception e) {
166 log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
169 forcePollOnConsumer();
171 } catch (Exception ex) {
172 // timed out. Try to stop the code if possible.
182 * getName() method returns string type value. returns 3 parameters in
183 * string:- fTopic,fGroup,fId
187 public String getName() {
188 return fTopic + " : " + fGroup + " : " + fId;
192 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
198 public long getCreateTimeMs() {
199 return fCreateTimeMs;
202 public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
207 * getLastAccessMs() method returns long type value. returns fLastTouch
213 public long getLastAccessMs() {
218 * getOffset() method returns long type value. returns offset variable value
223 public long getOffset() {
228 * commit offsets commitOffsets() method will be called on closed of
234 * public void commitOffsets() { if (getState() ==
235 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
236 * on closed KafkaConsumer " + getName()); return; }
237 * fConnector.commitOffsets(); }
241 * updating fLastTouch with current time in ms
243 public void touch() {
244 fLastTouch = System.currentTimeMillis();
248 * getLastTouch() method returns long type value. returns fLastTouch
252 public long getLastTouch() {
257 * setting the kafkaConsumer state to closed
260 public boolean close() {
261 if (getState() == Kafka011Consumer.State.CLOSED) {
263 log.error("close() called on closed KafkaConsumer " + getName());
268 boolean retVal = kafkaConnectorshuttask();
273 /* time out if the kafka shutdown fails for some reason */
275 private boolean kafkaConnectorshuttask() {
276 Callable<Boolean> run = new Callable<Boolean>() {
278 public Boolean call() throws Exception {
284 } catch (Exception e) {
285 log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
286 throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
289 log.info("Kafka connection closure with in 15 seconds by a Executors task");
295 @SuppressWarnings({ "rawtypes", "unchecked" })
296 RunnableFuture future = new FutureTask(run);
297 ExecutorService service = Executors.newSingleThreadExecutor();
298 service.execute(future);
300 future.get(200, TimeUnit.SECONDS); // wait 1
302 } catch (TimeoutException ex) {
303 // timed out. Try to stop the code if possible.
304 log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
306 setState(Kafka011Consumer.State.OPENED);
307 } catch (Exception ex) {
308 // timed out. Try to stop the code if possible.
309 log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
312 setState(Kafka011Consumer.State.OPENED);
316 setState(Kafka011Consumer.State.CLOSED);
320 public void forcePollOnConsumer() {
321 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
326 * getConsumerGroup() returns Consumer group
330 public String getConsumerGroup() {
335 * getConsumerId returns Consumer Id
339 public String getConsumerId() {
344 * getState returns kafkaconsumer state
348 private Kafka011Consumer.State getState() {
353 * setState() sets the kafkaConsumer state
357 private void setState(Kafka011Consumer.State state) {
362 private final String fTopic;
363 private final String fGroup;
364 private final String fId;
365 private final String fLogTag;
367 private KafkaConsumer<String, String> kConsumer;
368 private long fCreateTimeMs;
369 private long fLastTouch;
371 private Kafka011Consumer.State state;
372 private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
373 private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
374 private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
377 public void commitOffsets() {
378 if (getState() == Kafka011Consumer.State.CLOSED) {
379 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
382 kConsumer.commitSync();
388 public void setOffset(long offsetval) {
393 public void setConsumerCache(KafkaConsumerCache cache) {