1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources;
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
29 import javax.servlet.http.HttpServletRequest;
30 import org.json.JSONException;
31 import org.json.JSONObject;
33 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
34 import org.onap.dmaap.dmf.mr.CambriaApiException;
35 import org.onap.dmaap.dmf.mr.backends.Consumer;
36 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
37 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
38 import org.onap.dmaap.dmf.mr.metabroker.Topic;
39 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
40 import org.onap.dmaap.dmf.mr.utils.Utils;
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
54 * class used to write the consumed messages
56 * @author anowarul.islam
59 public class CambriaOutboundEventStream implements StreamWriter {
60 private static final int kTopLimit = 1024 * 4;
64 * static innerclass it takes all the input parameter for kafka consumer
65 * like limit, timeout, meta, pretty
67 * @author anowarul.islam
70 public static class Builder {
73 private final Consumer fConsumer;
74 // private final rrNvReadable fSettings; // used during write to tweak
75 // format, decide to explicitly
76 // close stream or not
80 private int fTimeoutMs;
81 private String fTopicFilter;
82 private boolean fPretty;
83 private boolean fWithMeta;
84 ArrayList<Consumer> fKafkaConsumerList;
88 * constructor it initializes all the consumer parameters
93 public Builder(Consumer c) {
97 fLimit = CambriaConstants.kNoTimeout;
98 fTimeoutMs = CambriaConstants.kNoLimit;
99 fTopicFilter = CambriaConstants.kNoFilter;
108 * constructor initializes with limit
111 * only l no of messages will be consumed
114 public Builder limit(int l) {
120 * constructor initializes with timeout
123 * if there is no message to consume, them DMaaP will wait
127 public Builder timeout(int t) {
133 * constructor initializes with filter
139 public Builder filter(String f) {
140 this.fTopicFilter = f;
145 * constructor initializes with boolean value pretty
148 * messages print in new line
151 public Builder pretty(boolean p) {
157 * constructor initializes with boolean value meta
160 * along with messages offset will print
163 public Builder withMeta(boolean withMeta) {
164 fWithMeta = withMeta;
168 // public Builder atOffset ( int pos )
174 * method returs object of CambriaOutboundEventStream
177 * @throws CambriaApiException
179 public CambriaOutboundEventStream build() throws CambriaApiException {
180 return new CambriaOutboundEventStream(this);
184 @SuppressWarnings("unchecked")
188 * @throws CambriaApiException
191 private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
192 fConsumer = builder.fConsumer;
193 fLimit = builder.fLimit;
194 fTimeoutMs = builder.fTimeoutMs;
197 fPretty = builder.fPretty;
198 fWithMeta = builder.fWithMeta;
199 fKafkaConsumerList = builder.fKafkaConsumerList;
224 * interface provides onWait and onMessage methods
227 public interface operation {
231 * @throws IOException
233 void onWait() throws IOException;
236 * provides the output based in the consumer paramter
240 * @throws IOException
243 void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
250 public int getSentCount() {
260 public void write(final OutputStream os) throws IOException {
264 // synchronized(this){
266 fSent = forEachMessage(new operation() {
268 public void onMessage(int count, String msg, String transId, long offSet)
269 throws IOException, JSONException {
275 final JSONObject entry = new JSONObject();
276 entry.put("offset", offSet);
277 entry.put("message", msg);
278 os.write(entry.toString().getBytes());
281 String jsonString = JSONObject.valueToString(msg);
282 os.write(jsonString.getBytes());
289 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap
290 .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
291 if (null == metricTopicname)
292 metricTopicname = "msgrtr.apinode.metrics.dmaap";
293 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
295 if (istransEnable && istransType) {
296 // final String transactionId =
299 StringBuilder consumerInfo = new StringBuilder();
300 if (null != dmaapContext && null != dmaapContext.getRequest()) {
301 final HttpServletRequest request = dmaapContext.getRequest();
302 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
303 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
304 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
305 consumerInfo.append("consumerGroup= \""
306 + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
307 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
309 log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
310 + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
312 } catch (Exception e) {
321 * It makes thread to wait
323 * @throws IOException
325 public void onWait() throws IOException {
326 os.flush(); // likely totally unnecessary for a network socket
328 // FIXME: would be good to wait/signal
330 } catch (InterruptedException e) {
331 Thread.currentThread().interrupt();
337 if (null != dmaapContext && istransEnable && istransType) {
339 dmaapContext.getResponse().setHeader("transactionId",
340 Utils.getResponseTransactionId(responseTransactionId));
346 boolean close_out_stream = true;
347 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
348 if (null != strclose_out_stream)
349 close_out_stream = Boolean.parseBoolean(strclose_out_stream);
352 if (close_out_stream) {
363 private String getConsumerGroupFromRequest(String requestURI) {
364 if (null != requestURI && !requestURI.isEmpty()) {
366 String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
368 int startIndex = consumerDetails.indexOf("/") + 1;
369 int endIndex = consumerDetails.lastIndexOf("/");
370 return consumerDetails.substring(startIndex, endIndex);
379 * @throws IOException
380 * @throws JSONException
382 public int forEachMessage(operation op) throws IOException, JSONException {
383 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
386 boolean firstPing = true;
387 // boolean isTransType=false;
388 final long startMs = System.currentTimeMillis();
389 final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll
391 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
398 Consumer.Message msgRecord = null;
399 while (count < effectiveLimit && (msgRecord =
400 fConsumer.nextMessage()) != null) {
403 String transactionid = "";
405 // String msgRecord = msg;
406 JSONObject jsonMessage = new JSONObject(msgRecord);
407 String[] keys = JSONObject.getNames(jsonMessage);
408 boolean wrapheader1 = false;
409 boolean wrapheader2 = false;
410 boolean found_attr3 = false;
411 String wrapElement1 = "message";
412 String wrapElement2 = "msgWrapMR";
413 String transIdElement = "transactionId";
415 for (String key : keys) {
416 if (key.equals(wrapElement1)) {
418 } else if (key.equals(wrapElement2)) {
420 } else if (key.equals(transIdElement)) {
422 transactionid = jsonMessage.getString(key);
427 // returns contents of attribute 1 if both attributes
428 // present, otherwise
430 if (wrapheader2 && found_attr3) {
431 message = jsonMessage.getString(wrapElement2);
432 } else if (wrapheader1 && found_attr3) {
433 message = jsonMessage.getString(wrapElement1);
435 message = msgRecord.getMessage();
437 // jsonMessage = extractMessage(jsonMessage ,
438 // "message","msgWrapMR","transactionId");
440 } catch (JSONException e) { // This check is required for the
441 // message sent by MR AAF flow but
442 // consumed by UEB ACL flow which
443 // wont expect transaction id in
444 // cambria client api
446 log.info("JSON Exception logged when the message is non JSON Format");
447 } catch (Exception exp) {
448 log.info("****Some Exception occured for writing messages in topic" + topic.getName()
449 + " Exception" + exp);
451 if (message == null || message.equals("")) {
453 message = msgRecord.getMessage();
456 // If filters are enabled/set, message should be in JSON format
457 // for filters to work for
458 // otherwise filter will automatically ignore message in
460 if (filterMatches(message)) {
461 op.onMessage(count, message, transactionid, msgRecord.getOffset());
475 * Checks whether filter is initialized
486 private boolean filterMatches(String msg) {
487 boolean result = true;
504 public DMaaPContext getDmaapContext() {
508 public void setDmaapContext(DMaaPContext dmaapContext) {
509 this.dmaapContext = dmaapContext;
512 public Topic getTopic() {
516 public void setTopic(Topic topic) {
520 public void setTopicStyle(boolean aaftopic) {
521 this.isAAFTopic = aaftopic;
524 public void setTransEnabled(boolean transEnable) {
525 this.istransEnable = transEnable;
529 private final Consumer fConsumer;
530 private final int fLimit;
531 private final int fTimeoutMs;
533 private final boolean fPretty;
534 private final boolean fWithMeta;
538 private DMaaPContext dmaapContext;
539 private String responseTransactionId;
541 private boolean isAAFTopic = false;
542 private boolean istransEnable = false;
543 private ArrayList<Consumer> fKafkaConsumerList;
544 private boolean istransType = true;
545 // private static final Logger log =
548 private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
550 public int getfLimit() {
554 public int getfTimeoutMs() {
558 public boolean isfPretty() {
562 public boolean isfWithMeta() {
566 public boolean isAAFTopic() {
570 public boolean isIstransEnable() {
571 return istransEnable;
574 public boolean isIstransType() {