1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.resources;
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
32 import org.apache.kafka.clients.consumer.ConsumerRecord;
33 import org.apache.kafka.clients.consumer.ConsumerRecords;
34 import org.json.JSONException;
35 import org.json.JSONObject;
36 import org.json.JSONTokener;
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.dmf.mr.CambriaApiException;
40 import com.att.dmf.mr.backends.Consumer;
41 import com.att.dmf.mr.beans.DMaaPContext;
42 import com.att.dmf.mr.constants.CambriaConstants;
43 import com.att.dmf.mr.metabroker.Topic;
44 import com.att.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
45 import com.att.dmf.mr.utils.Utils;
46 import com.att.eelf.configuration.EELFLogger;
47 import com.att.eelf.configuration.EELFManager;
48 //import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 /*import com.att.sa.highlandPark.config.HpConfigContext;
50 import com.att.sa.highlandPark.config.HpReaderException;
51 import com.att.sa.highlandPark.events.HpJsonEvent;
52 import com.att.sa.highlandPark.events.HpJsonEventFactory;
53 import com.att.sa.highlandPark.processor.HpAlarmFilter;
54 import com.att.sa.highlandPark.processor.HpEvent;
55 import com.att.sa.highlandPark.processor.HpProcessingEngine;
56 import com.att.sa.highlandPark.processor.HpProcessingEngine.EventFactory;
59 * class used to write the consumed messages
61 * @author anowarul.islam
64 public class CambriaOutboundEventStream implements StreamWriter {
65 private static final int kTopLimit = 1024 * 4;
69 * static innerclass it takes all the input parameter for kafka consumer
70 * like limit, timeout, meta, pretty
72 * @author anowarul.islam
75 public static class Builder {
78 private final Consumer fConsumer;
79 // private final rrNvReadable fSettings; // used during write to tweak
80 // format, decide to explicitly
81 // close stream or not
85 private int fTimeoutMs;
86 private String fTopicFilter;
87 private boolean fPretty;
88 private boolean fWithMeta;
89 ArrayList<Consumer> fKafkaConsumerList;
91 // private int fOffset;
93 * constructor it initializes all the consumer parameters
98 public Builder(Consumer c) {
100 // this.fSettings = settings;
102 fLimit = CambriaConstants.kNoTimeout;
103 fTimeoutMs = CambriaConstants.kNoLimit;
104 fTopicFilter = CambriaConstants.kNoFilter;
107 //this.fKafkaConsumerList = consList;
108 // fOffset = CambriaEvents.kNextOffset;
113 * constructor initializes with limit
116 * only l no of messages will be consumed
119 public Builder limit(int l) {
125 * constructor initializes with timeout
128 * if there is no message to consume, them DMaaP will wait
132 public Builder timeout(int t) {
138 * constructor initializes with filter
144 public Builder filter(String f) {
145 this.fTopicFilter = f;
150 * constructor initializes with boolean value pretty
153 * messages print in new line
156 public Builder pretty(boolean p) {
162 * constructor initializes with boolean value meta
165 * along with messages offset will print
168 public Builder withMeta(boolean withMeta) {
169 fWithMeta = withMeta;
173 // public Builder atOffset ( int pos )
179 * method returs object of CambriaOutboundEventStream
182 * @throws CambriaApiException
184 public CambriaOutboundEventStream build() throws CambriaApiException {
185 return new CambriaOutboundEventStream(this);
189 @SuppressWarnings("unchecked")
193 * @throws CambriaApiException
196 private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
197 fConsumer = builder.fConsumer;
198 fLimit = builder.fLimit;
199 fTimeoutMs = builder.fTimeoutMs;
202 fPretty = builder.fPretty;
203 fWithMeta = builder.fWithMeta;
204 fKafkaConsumerList = builder.fKafkaConsumerList;
205 /* if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
206 fHpAlarmFilter = null;
210 final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
211 HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
212 fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
213 final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
214 fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
215 } catch (HpReaderException e) {
216 // JSON was okay, but the filter engine says it's bogus
217 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
218 "Couldn't create filter: " + e.getMessage());
219 } catch (JSONException e) {
220 // user sent a bogus JSON object
221 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
222 "Couldn't parse JSON: " + e.getMessage());
229 * interface provides onWait and onMessage methods
232 public interface operation {
236 * @throws IOException
238 void onWait() throws IOException;
241 * provides the output based in the consumer paramter
245 * @throws IOException
248 void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
255 public int getSentCount() {
265 public void write(final OutputStream os) throws IOException {
267 // final boolean transactionEnabled = isTransEnabled();
268 // final boolean transactionEnabled = istransEnable;
269 // synchronized(this){
271 fSent = forEachMessage(new operation() {
273 public void onMessage(int count, String msg, String transId, long offSet)
274 throws IOException, JSONException {
280 final JSONObject entry = new JSONObject();
281 entry.put("offset", offSet);
282 entry.put("message", msg);
283 os.write(entry.toString().getBytes());
286 String jsonString = JSONObject.valueToString(msg);
287 os.write(jsonString.getBytes());
294 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap
295 .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
296 if (null == metricTopicname)
297 metricTopicname = "msgrtr.apinode.metrics.dmaap";
298 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
300 if (istransEnable && istransType) {
301 // final String transactionId =
303 // responseTransactionId = transId;
304 StringBuilder consumerInfo = new StringBuilder();
305 if (null != dmaapContext && null != dmaapContext.getRequest()) {
306 final HttpServletRequest request = dmaapContext.getRequest();
307 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
308 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
309 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
310 consumerInfo.append("consumerGroup= \""
311 + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
312 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
314 log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
315 + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
317 } catch (Exception e) {
326 * It makes thread to wait
328 * @throws IOException
330 public void onWait() throws IOException {
331 os.flush(); // likely totally unnecessary for a network socket
333 // FIXME: would be good to wait/signal
335 } catch (InterruptedException e) {
341 // if (null != dmaapContext && isTransactionEnabled()) {
342 if (null != dmaapContext && istransEnable && istransType) {
344 dmaapContext.getResponse().setHeader("transactionId",
345 Utils.getResponseTransactionId(responseTransactionId));
351 boolean close_out_stream = true;
352 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
353 if (null != strclose_out_stream)
354 close_out_stream = Boolean.parseBoolean(strclose_out_stream);
356 // if (fSettings.getBoolean("close.output.stream", true)) {
357 if (close_out_stream) {
368 private String getConsumerGroupFromRequest(String requestURI) {
369 if (null != requestURI && !requestURI.isEmpty()) {
371 String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
373 int startIndex = consumerDetails.indexOf("/") + 1;
374 int endIndex = consumerDetails.lastIndexOf("/");
375 return consumerDetails.substring(startIndex, endIndex);
384 * @throws IOException
385 * @throws JSONException
387 public int forEachMessage(operation op) throws IOException, JSONException {
388 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
391 boolean firstPing = true;
392 // boolean isTransType=false;
393 final long startMs = System.currentTimeMillis();
394 final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll
396 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
403 Consumer.Message msgRecord = null;
404 while (count < effectiveLimit && (msgRecord =
405 fConsumer.nextMessage()) != null) {
408 String transactionid = "";
410 // String msgRecord = msg;
411 JSONObject jsonMessage = new JSONObject(msgRecord);
412 String[] keys = JSONObject.getNames(jsonMessage);
413 boolean wrapheader1 = false;
414 boolean wrapheader2 = false;
415 boolean found_attr3 = false;
416 String wrapElement1 = "message";
417 String wrapElement2 = "msgWrapMR";
418 String transIdElement = "transactionId";
420 for (String key : keys) {
421 if (key.equals(wrapElement1)) {
423 } else if (key.equals(wrapElement2)) {
425 } else if (key.equals(transIdElement)) {
427 transactionid = jsonMessage.getString(key);
432 // returns contents of attribute 1 if both attributes
433 // present, otherwise
435 if (wrapheader2 && found_attr3) {
436 message = jsonMessage.getString(wrapElement2);
437 } else if (wrapheader1 && found_attr3) {
438 message = jsonMessage.getString(wrapElement1);
440 message = msgRecord.getMessage();
442 // jsonMessage = extractMessage(jsonMessage ,
443 // "message","msgWrapMR","transactionId");
445 } catch (JSONException e) { // This check is required for the
446 // message sent by MR AAF flow but
447 // consumed by UEB ACL flow which
448 // wont expect transaction id in
449 // cambria client api
451 log.info("JSON Exception logged when the message is non JSON Format");
452 } catch (Exception exp) {
453 log.info("****Some Exception occured for writing messages in topic" + topic.getName()
454 + " Exception" + exp);
456 if (message == null || message.equals("")) {
458 message = msgRecord.getMessage();
461 // If filters are enabled/set, message should be in JSON format
462 // for filters to work for
463 // otherwise filter will automatically ignore message in
465 if (filterMatches(message)) {
466 op.onMessage(count, message, transactionid, msgRecord.getOffset());
480 * Checks whether filter is initialized
482 /*private boolean isFilterInitialized() {
483 return (fHpAlarmFilter != null && fHppe != null);
491 private boolean filterMatches(String msg) {
492 boolean result = true;
493 /*if (isFilterInitialized()) {
495 final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
496 result = fHpAlarmFilter.matches(fHppe, e);
497 } catch (JSONException x) {
498 // the msg may not be JSON
500 log.error("Failed due to " + x.getMessage());
501 } catch (Exception x) {
502 log.error("Error using filter: " + x.getMessage(), x);
509 public DMaaPContext getDmaapContext() {
513 public void setDmaapContext(DMaaPContext dmaapContext) {
514 this.dmaapContext = dmaapContext;
517 public Topic getTopic() {
521 public void setTopic(Topic topic) {
525 public void setTopicStyle(boolean aaftopic) {
526 this.isAAFTopic = aaftopic;
529 public void setTransEnabled(boolean transEnable) {
530 this.istransEnable = transEnable;
534 private final Consumer fConsumer;
535 private final int fLimit;
536 private final int fTimeoutMs;
537 // private final rrNvReadable fSettings;
538 private final boolean fPretty;
539 private final boolean fWithMeta;
541 // private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
542 //private final HpProcessingEngine<HpJsonEvent> fHppe;
543 private DMaaPContext dmaapContext;
544 private String responseTransactionId;
546 private boolean isAAFTopic = false;
547 private boolean istransEnable = false;
548 private ArrayList<Consumer> fKafkaConsumerList;
549 private boolean istransType = true;
550 // private static final Logger log =
551 // Logger.getLogger(CambriaOutboundEventStream.class);
553 private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);