1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import org.json.JSONException;
28 import org.json.JSONObject;
29 import org.onap.dmaap.dmf.mr.CambriaApiException;
30 import org.onap.dmaap.dmf.mr.backends.Consumer;
31 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
32 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
33 import org.onap.dmaap.dmf.mr.metabroker.Topic;
34 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
35 import org.onap.dmaap.dmf.mr.utils.Utils;
37 import javax.servlet.http.HttpServletRequest;
38 import java.io.IOException;
39 import java.io.OutputStream;
40 import java.util.ArrayList;
41 import java.util.Date;
45 * class used to write the consumed messages
47 * @author anowarul.islam
50 public class CambriaOutboundEventStream implements StreamWriter {
51 private static final int kTopLimit = 1024 * 4;
55 * static innerclass it takes all the input parameter for kafka consumer
56 * like limit, timeout, meta, pretty
58 * @author anowarul.islam
61 public static class Builder {
64 private final Consumer fConsumer;
65 // private final rrNvReadable fSettings; // used during write to tweak
66 // format, decide to explicitly
67 // close stream or not
71 private int fTimeoutMs;
72 private String fTopicFilter;
73 private boolean fPretty;
74 private boolean fWithMeta;
75 ArrayList<Consumer> fKafkaConsumerList;
79 * constructor it initializes all the consumer parameters
84 public Builder(Consumer c) {
88 fLimit = CambriaConstants.kNoTimeout;
89 fTimeoutMs = CambriaConstants.kNoLimit;
90 fTopicFilter = CambriaConstants.kNoFilter;
99 * constructor initializes with limit
102 * only l no of messages will be consumed
105 public Builder limit(int l) {
111 * constructor initializes with timeout
114 * if there is no message to consume, them DMaaP will wait
118 public Builder timeout(int t) {
124 * constructor initializes with filter
130 public Builder filter(String f) {
131 this.fTopicFilter = f;
136 * constructor initializes with boolean value pretty
139 * messages print in new line
142 public Builder pretty(boolean p) {
148 * constructor initializes with boolean value meta
151 * along with messages offset will print
154 public Builder withMeta(boolean withMeta) {
155 fWithMeta = withMeta;
159 // public Builder atOffset ( int pos )
165 * method returs object of CambriaOutboundEventStream
168 * @throws CambriaApiException
170 public CambriaOutboundEventStream build() throws CambriaApiException {
171 return new CambriaOutboundEventStream(this);
175 @SuppressWarnings("unchecked")
179 * @throws CambriaApiException
182 private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
183 fConsumer = builder.fConsumer;
184 fLimit = builder.fLimit;
185 fTimeoutMs = builder.fTimeoutMs;
188 fPretty = builder.fPretty;
189 fWithMeta = builder.fWithMeta;
190 fKafkaConsumerList = builder.fKafkaConsumerList;
215 * interface provides onWait and onMessage methods
218 public interface operation {
222 * @throws IOException
224 void onWait() throws IOException;
227 * provides the output based in the consumer paramter
231 * @throws IOException
234 void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
241 public int getSentCount() {
251 public void write(final OutputStream os) throws IOException {
255 // synchronized(this){
257 fSent = forEachMessage(new operation() {
259 public void onMessage(int count, String msg, String transId, long offSet)
260 throws IOException, JSONException {
266 final JSONObject entry = new JSONObject();
267 entry.put("offset", offSet);
268 entry.put("message", msg);
269 os.write(entry.toString().getBytes());
272 String jsonString = JSONObject.valueToString(msg);
273 os.write(jsonString.getBytes());
280 String metricTopicname = AJSCPropertiesMap
281 .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
282 if (null == metricTopicname)
283 metricTopicname = "msgrtr.apinode.metrics.dmaap";
284 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
286 if (istransEnable && istransType) {
287 // final String transactionId =
290 StringBuilder consumerInfo = new StringBuilder();
291 if (null != dmaapContext && null != dmaapContext.getRequest()) {
292 final HttpServletRequest request = dmaapContext.getRequest();
293 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
294 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
295 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
296 consumerInfo.append("consumerGroup= \""
297 + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
298 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
300 log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
301 + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
303 } catch (Exception e) {
312 * It makes thread to wait
314 * @throws IOException
316 public void onWait() throws IOException {
317 os.flush(); // likely totally unnecessary for a network socket
319 // FIXME: would be good to wait/signal
321 } catch (InterruptedException e) {
322 Thread.currentThread().interrupt();
328 if (null != dmaapContext && istransEnable && istransType) {
330 dmaapContext.getResponse().setHeader("transactionId",
331 Utils.getResponseTransactionId(responseTransactionId));
337 boolean close_out_stream = true;
338 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
339 if (null != strclose_out_stream)
340 close_out_stream = Boolean.parseBoolean(strclose_out_stream);
343 if (close_out_stream) {
354 private String getConsumerGroupFromRequest(String requestURI) {
355 if (null != requestURI && !requestURI.isEmpty()) {
357 String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
359 int startIndex = consumerDetails.indexOf("/") + 1;
360 int endIndex = consumerDetails.lastIndexOf("/");
361 return consumerDetails.substring(startIndex, endIndex);
370 * @throws IOException
371 * @throws JSONException
373 public int forEachMessage(operation op) throws IOException, JSONException {
374 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
377 boolean firstPing = true;
378 // boolean isTransType=false;
379 final long startMs = System.currentTimeMillis();
380 final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll
382 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
389 Consumer.Message msgRecord = null;
390 while (count < effectiveLimit && (msgRecord =
391 fConsumer.nextMessage()) != null) {
394 String transactionid = "";
396 // String msgRecord = msg;
397 JSONObject jsonMessage = new JSONObject(msgRecord);
398 String[] keys = JSONObject.getNames(jsonMessage);
399 boolean wrapheader1 = false;
400 boolean wrapheader2 = false;
401 boolean found_attr3 = false;
402 String wrapElement1 = "message";
403 String wrapElement2 = "msgWrapMR";
404 String transIdElement = "transactionId";
406 for (String key : keys) {
407 if (key.equals(wrapElement1)) {
409 } else if (key.equals(wrapElement2)) {
411 } else if (key.equals(transIdElement)) {
413 transactionid = jsonMessage.getString(key);
418 // returns contents of attribute 1 if both attributes
419 // present, otherwise
421 if (wrapheader2 && found_attr3) {
422 message = jsonMessage.getString(wrapElement2);
423 } else if (wrapheader1 && found_attr3) {
424 message = jsonMessage.getString(wrapElement1);
426 message = msgRecord.getMessage();
428 // jsonMessage = extractMessage(jsonMessage ,
429 // "message","msgWrapMR","transactionId");
431 } catch (JSONException e) { // This check is required for the
432 // message sent by MR AAF flow but
433 // consumed by UEB ACL flow which
434 // wont expect transaction id in
435 // cambria client api
437 log.info("JSON Exception logged when the message is non JSON Format");
438 } catch (Exception exp) {
439 log.info("****Some Exception occured for writing messages in topic" + topic.getName()
440 + " Exception" + exp);
442 if (message == null || message.equals("")) {
444 message = msgRecord.getMessage();
447 // If filters are enabled/set, message should be in JSON format
448 // for filters to work for
449 // otherwise filter will automatically ignore message in
451 if (filterMatches(message)) {
452 op.onMessage(count, message, transactionid, msgRecord.getOffset());
466 * Checks whether filter is initialized
477 private boolean filterMatches(String msg) {
478 boolean result = true;
495 public DMaaPContext getDmaapContext() {
499 public void setDmaapContext(DMaaPContext dmaapContext) {
500 this.dmaapContext = dmaapContext;
503 public Topic getTopic() {
507 public void setTopic(Topic topic) {
511 public void setTopicStyle(boolean aaftopic) {
512 this.isAAFTopic = aaftopic;
515 public void setTransEnabled(boolean transEnable) {
516 this.istransEnable = transEnable;
520 private final Consumer fConsumer;
521 private final int fLimit;
522 private final int fTimeoutMs;
524 private final boolean fPretty;
525 private final boolean fWithMeta;
529 private DMaaPContext dmaapContext;
530 private String responseTransactionId;
532 private boolean isAAFTopic = false;
533 private boolean istransEnable = false;
534 private ArrayList<Consumer> fKafkaConsumerList;
535 private boolean istransType = true;
536 // private static final Logger log =
539 private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
541 public int getfLimit() {
545 public int getfTimeoutMs() {
549 public boolean isfPretty() {
553 public boolean isfWithMeta() {
557 public boolean isAAFTopic() {
561 public boolean isIstransEnable() {
562 return istransEnable;
565 public boolean isIstransType() {