44c74a652761d64862d78b396e2d60f2686b82fa
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / backends / kafka / KafkaConsumer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.backends.kafka;
23
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27
28 import kafka.consumer.ConsumerIterator;
29 import kafka.consumer.KafkaStream;
30 import kafka.javaapi.consumer.ConsumerConnector;
31 import kafka.message.MessageAndMetadata;
32
33 //import org.slf4j.Logger;
34 //import org.slf4j.LoggerFactory;
35
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38
39 import com.att.nsa.cambria.backends.Consumer;
40
41 /**
42  * A consumer instance that's created per-request. These are stateless so that
43  * clients can connect to this service as a proxy.
44  * 
45  * @author author
46  *
47  */
48 public class KafkaConsumer implements Consumer {
49         private enum State {
50                 OPENED, CLOSED
51         }
52
53         /**
54          * KafkaConsumer() is constructor. It has following 4 parameters:-
55          * @param topic
56          * @param group
57          * @param id
58          * @param cc
59          * 
60          */
61         
62         public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) {
63                 fTopic = topic;
64                 fGroup = group;
65                 fId = id;
66                 fConnector = cc;
67
68                 fCreateTimeMs = System.currentTimeMillis();
69                 fLastTouch = fCreateTimeMs;
70
71                 fLogTag = fGroup + "(" + fId + ")/" + fTopic;
72                 offset = 0;
73
74                 state = KafkaConsumer.State.OPENED;
75
76                 final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
77                 topicCountMap.put(fTopic, 1);
78                 final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = fConnector
79                                 .createMessageStreams(topicCountMap);
80                 final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(fTopic);
81                 fStream = streams.iterator().next();
82         }
83
84         
85         /** getName() method returns string type value.
86          * returns 3 parameters in string:- 
87          * fTopic,fGroup,fId
88          * @Override
89          */
90         public String getName() {
91                 return fTopic + " : " + fGroup + " : " + fId;
92         }
93
94         /** getCreateTimeMs() method returns long type value.
95          * returns fCreateTimeMs variable value 
96          * @Override
97          * 
98          */
99         public long getCreateTimeMs() {
100                 return fCreateTimeMs;
101         }
102
103         /** getLastAccessMs() method returns long type value.
104          * returns fLastTouch variable value 
105          * @Override
106          * 
107          */
108         public long getLastAccessMs() {
109                 return fLastTouch;
110         }
111
112         
113         /** 
114          * nextMessage() is synchronized method that means at a time only one object can access it.
115          * getName() method returns String which is of type Consumer.Message
116          * @Override
117          * */
118         public synchronized Consumer.Message nextMessage() {
119                 if (getState() == KafkaConsumer.State.CLOSED) {
120                         log.warn("nextMessage() called on closed KafkaConsumer " + getName());
121                         return null;
122                 }
123
124                 try {
125                         ConsumerIterator<byte[], byte[]> it = fStream.iterator();
126                         if (it.hasNext()) {
127                                 final MessageAndMetadata<byte[], byte[]> msg = it.next();
128                                 offset = msg.offset();
129
130                                 return new Consumer.Message() {
131                                         @Override
132                                         public long getOffset() {
133                                                 return msg.offset();
134                                         }
135
136                                         @Override
137                                         public String getMessage() {
138                                                 return new String(msg.message());
139                                         }
140                                 };
141                         }
142                 } catch (kafka.consumer.ConsumerTimeoutException x) {
143                         log.debug(fLogTag + ": ConsumerTimeoutException in Kafka consumer; returning null. ");
144                 } catch (java.lang.IllegalStateException x) {
145                         log.error(fLogTag + ": Illegal state exception in Kafka consumer; dropping stream. " + x.getMessage());
146                 }
147
148                 return null;
149         }
150         
151         /** getOffset() method returns long type value.
152          * returns offset variable value 
153          * @Override
154          * 
155          */
156         public long getOffset() {
157                 return offset;
158         }
159
160         /** commit offsets 
161          * commitOffsets() method will be called on closed of KafkaConsumer.
162          * @Override
163          * 
164          */
165         public void commitOffsets() {
166                 if (getState() == KafkaConsumer.State.CLOSED) {
167                         log.warn("commitOffsets() called on closed KafkaConsumer " + getName());
168                         return;
169                 }
170                 fConnector.commitOffsets();
171         }
172
173         /**
174          * updating fLastTouch with current time in ms
175          */
176         public void touch() {
177                 fLastTouch = System.currentTimeMillis();
178         }
179         
180         /** getLastTouch() method returns long type value.
181          * returns fLastTouch variable value
182          * 
183          */
184         public long getLastTouch() {
185                 return fLastTouch;
186         }
187
188         /**
189          *   setting the kafkaConsumer state to closed
190          */
191         public synchronized void close() {
192                 if (getState() == KafkaConsumer.State.CLOSED) {
193                         log.warn("close() called on closed KafkaConsumer " + getName());
194                         return;
195                 }
196
197                 setState(KafkaConsumer.State.CLOSED);
198                 fConnector.shutdown();
199         }
200         
201         /**
202          * getConsumerGroup() returns Consumer group
203          * @return
204          */
205         public String getConsumerGroup() {
206                 return fGroup;
207         }
208         
209         /**
210          * getConsumerId returns Consumer Id
211          * @return
212          */
213         public String getConsumerId() {
214                 return fId;
215         }
216
217         /**
218          * getState returns kafkaconsumer state
219          * @return
220          */     
221         private KafkaConsumer.State getState() {
222                 return this.state;
223         }
224         
225         /**
226          * setState() sets the kafkaConsumer state
227          * @param state
228          */
229         private void setState(KafkaConsumer.State state) {
230                 this.state = state;
231         }
232
233         private ConsumerConnector fConnector;
234         private final String fTopic;
235         private final String fGroup;
236         private final String fId;
237         private final String fLogTag;
238         private final KafkaStream<byte[], byte[]> fStream;
239         private long fCreateTimeMs;
240         private long fLastTouch;
241         private long offset;
242         private KafkaConsumer.State state;
243         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class);
244         //private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
245 }