1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.FutureTask;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RunnableFuture;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
42 import org.onap.dmaap.dmf.mr.backends.Consumer;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
46 import com.att.eelf.configuration.EELFLogger;
47 import com.att.eelf.configuration.EELFManager;
50 * A consumer instance that's created per-request. These are stateless so that
51 * clients can connect to this service as a proxy.
56 public class Kafka011Consumer implements Consumer {
63 * KafkaConsumer() is constructor. It has following 4 parameters:-
72 public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc,
73 KafkaLiveLockAvoider2 klla) throws Exception {
77 fCreateTimeMs = System.currentTimeMillis();
78 fLastTouch = fCreateTimeMs;
79 fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
80 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
82 state = Kafka011Consumer.State.OPENED;
84 fKafkaLiveLockAvoider = klla;
86 String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
88 if (null != consumerTimeOut) {
89 consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
91 synchronized (kConsumer) {
92 kConsumer.subscribe(Arrays.asList(topic));
96 private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) {
97 return new Consumer.Message() {
99 public long getOffset() {
100 offset = msg.offset();
105 public String getMessage() {
106 return new String(msg.value());
112 public synchronized Consumer.Message nextMessage() {
115 if (fPendingMsgs.size() > 0) {
116 return makeMessage(fPendingMsgs.take());
118 } catch (InterruptedException x) {
119 log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
123 Callable<Boolean> run = new Callable<Boolean>() {
125 public Boolean call() throws Exception {
127 ConsumerRecords<String, String> records;
128 synchronized (kConsumer) {
129 records = kConsumer.poll(500);
131 for (ConsumerRecord<String, String> record : records) {
133 fPendingMsgs.offer(record);
136 } catch (KafkaException x) {
137 log.debug(fLogTag + ": KafkaException " + x.getMessage());
139 } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
140 log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
150 @SuppressWarnings({ "rawtypes", "unchecked" })
151 RunnableFuture future = new FutureTask(run);
152 ExecutorService service = Executors.newSingleThreadExecutor();
153 service.execute(future);
155 future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
157 } catch (TimeoutException ex) {
158 // timed out. Try to stop the code if possible.
159 String apiNodeId = null;
161 apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
162 } catch (UnknownHostException e1) {
163 // TODO Auto-generated catch block
164 log.error("unable to get the localhost address");
168 if (fKafkaLiveLockAvoider != null)
169 fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
170 } catch (Exception e) {
171 log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
174 forcePollOnConsumer();
176 } catch (Exception ex) {
177 // timed out. Try to stop the code if possible.
187 * getName() method returns string type value. returns 3 parameters in
188 * string:- fTopic,fGroup,fId
192 public String getName() {
193 return fTopic + " : " + fGroup + " : " + fId;
197 * getCreateTimeMs() method returns long type value. returns fCreateTimeMs
203 public long getCreateTimeMs() {
204 return fCreateTimeMs;
207 public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() {
212 * getLastAccessMs() method returns long type value. returns fLastTouch
218 public long getLastAccessMs() {
223 * getOffset() method returns long type value. returns offset variable value
228 public long getOffset() {
233 * commit offsets commitOffsets() method will be called on closed of
239 * public void commitOffsets() { if (getState() ==
240 * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called
241 * on closed KafkaConsumer " + getName()); return; }
242 * fConnector.commitOffsets(); }
246 * updating fLastTouch with current time in ms
248 public void touch() {
249 fLastTouch = System.currentTimeMillis();
253 * getLastTouch() method returns long type value. returns fLastTouch
257 public long getLastTouch() {
262 * setting the kafkaConsumer state to closed
265 public boolean close() {
266 if (getState() == Kafka011Consumer.State.CLOSED) {
268 log.error("close() called on closed KafkaConsumer " + getName());
273 boolean retVal = kafkaConnectorshuttask();
278 /* time out if the kafka shutdown fails for some reason */
280 private boolean kafkaConnectorshuttask() {
281 Callable<Boolean> run = new Callable<Boolean>() {
283 public Boolean call() throws Exception {
289 } catch (Exception e) {
290 log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
291 throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e);
294 log.info("Kafka connection closure with in 15 seconds by a Executors task");
300 @SuppressWarnings({ "rawtypes", "unchecked" })
301 RunnableFuture future = new FutureTask(run);
302 ExecutorService service = Executors.newSingleThreadExecutor();
303 service.execute(future);
305 future.get(200, TimeUnit.SECONDS); // wait 1
307 } catch (TimeoutException ex) {
308 // timed out. Try to stop the code if possible.
309 log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
311 setState(Kafka011Consumer.State.OPENED);
312 } catch (Exception ex) {
313 // timed out. Try to stop the code if possible.
314 log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
317 setState(Kafka011Consumer.State.OPENED);
321 setState(Kafka011Consumer.State.CLOSED);
325 public void forcePollOnConsumer() {
326 Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId);
331 * getConsumerGroup() returns Consumer group
335 public String getConsumerGroup() {
340 * getConsumerId returns Consumer Id
344 public String getConsumerId() {
349 * getState returns kafkaconsumer state
353 private Kafka011Consumer.State getState() {
358 * setState() sets the kafkaConsumer state
362 private void setState(Kafka011Consumer.State state) {
367 private final String fTopic;
368 private final String fGroup;
369 private final String fId;
370 private final String fLogTag;
372 private KafkaConsumer<String, String> kConsumer;
373 private long fCreateTimeMs;
374 private long fLastTouch;
376 private Kafka011Consumer.State state;
377 private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
378 private int consumerPollTimeOut=5;
379 private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
380 private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;
383 public void commitOffsets() {
384 if (getState() == Kafka011Consumer.State.CLOSED) {
385 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
388 kConsumer.commitSync();
394 public void setOffset(long offsetval) {
399 public void setConsumerCache(KafkaConsumerCache cache) {