1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.backends.kafka;
24 import java.util.HashMap;
25 import java.util.List;
28 import kafka.consumer.ConsumerIterator;
29 import kafka.consumer.KafkaStream;
30 import kafka.javaapi.consumer.ConsumerConnector;
31 import kafka.message.MessageAndMetadata;
33 //import org.slf4j.Logger;
34 //import org.slf4j.LoggerFactory;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
39 import com.att.nsa.cambria.backends.Consumer;
42 * A consumer instance that's created per-request. These are stateless so that
43 * clients can connect to this service as a proxy.
48 public class KafkaConsumer implements Consumer {
54 * KafkaConsumer() is constructor. It has following 4 parameters:-
62 public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) {
68 fCreateTimeMs = System.currentTimeMillis();
69 fLastTouch = fCreateTimeMs;
71 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
74 state = KafkaConsumer.State.OPENED;
76 final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
77 topicCountMap.put(fTopic, 1);
78 final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = fConnector
79 .createMessageStreams(topicCountMap);
80 final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(fTopic);
81 fStream = streams.iterator().next();
85 /** getName() method returns string type value.
86 * returns 3 parameters in string:-
90 public String getName() {
91 return fTopic + " : " + fGroup + " : " + fId;
94 /** getCreateTimeMs() method returns long type value.
95 * returns fCreateTimeMs variable value
99 public long getCreateTimeMs() {
100 return fCreateTimeMs;
103 /** getLastAccessMs() method returns long type value.
104 * returns fLastTouch variable value
108 public long getLastAccessMs() {
114 * nextMessage() is synchronized method that means at a time only one object can access it.
115 * getName() method returns String which is of type Consumer.Message
118 public synchronized Consumer.Message nextMessage() {
119 if (getState() == KafkaConsumer.State.CLOSED) {
120 log.warn("nextMessage() called on closed KafkaConsumer " + getName());
125 ConsumerIterator<byte[], byte[]> it = fStream.iterator();
127 final MessageAndMetadata<byte[], byte[]> msg = it.next();
128 offset = msg.offset();
130 return new Consumer.Message() {
132 public long getOffset() {
137 public String getMessage() {
138 return new String(msg.message());
142 } catch (kafka.consumer.ConsumerTimeoutException x) {
143 log.debug(fLogTag + ": ConsumerTimeoutException in Kafka consumer; returning null. ");
144 } catch (java.lang.IllegalStateException x) {
145 log.error(fLogTag + ": Illegal state exception in Kafka consumer; dropping stream. " + x.getMessage());
151 /** getOffset() method returns long type value.
152 * returns offset variable value
156 public long getOffset() {
161 * commitOffsets() method will be called on closed of KafkaConsumer.
165 public void commitOffsets() {
166 if (getState() == KafkaConsumer.State.CLOSED) {
167 log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
170 fConnector.commitOffsets();
174 * updating fLastTouch with current time in ms
176 public void touch() {
177 fLastTouch = System.currentTimeMillis();
180 /** getLastTouch() method returns long type value.
181 * returns fLastTouch variable value
184 public long getLastTouch() {
189 * setting the kafkaConsumer state to closed
191 public synchronized void close() {
192 if (getState() == KafkaConsumer.State.CLOSED) {
193 log.warn("close() called on closed KafkaConsumer " + getName());
197 setState(KafkaConsumer.State.CLOSED);
198 fConnector.shutdown();
202 * getConsumerGroup() returns Consumer group
205 public String getConsumerGroup() {
210 * getConsumerId returns Consumer Id
213 public String getConsumerId() {
218 * getState returns kafkaconsumer state
221 private KafkaConsumer.State getState() {
226 * setState() sets the kafkaConsumer state
229 private void setState(KafkaConsumer.State state) {
233 private ConsumerConnector fConnector;
234 private final String fTopic;
235 private final String fGroup;
236 private final String fId;
237 private final String fLogTag;
238 private final KafkaStream<byte[], byte[]> fStream;
239 private long fCreateTimeMs;
240 private long fLastTouch;
242 private KafkaConsumer.State state;
243 private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
244 //private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);