1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources;
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.Date;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
31 import com.att.eelf.configuration.EELFLogger;
32 import com.att.eelf.configuration.EELFManager;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.json.JSONTokener;
36 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
37 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer;
38 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer.Message;
39 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.DMaaPContext;
40 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.constants.CambriaConstants;
41 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Topic;
42 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.Utils;
43 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter;
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
49 * class used to write the consumed messages
54 public class CambriaOutboundEventStream implements StreamWriter {
55 private static final int kTopLimit = 1024 * 4;
59 * static innerclass it takes all the input parameter for kafka consumer
60 * like limit, timeout, meta, pretty
65 public static class Builder {
68 private final Consumer fConsumer;
69 //private final rrNvReadable fSettings; // used during write to tweak
70 // format, decide to explicitly
71 // close stream or not
75 private int fTimeoutMs;
76 private String fTopicFilter;
77 private boolean fPretty;
78 private boolean fWithMeta;
80 // private int fOffset;
82 * constructor it initializes all the consumer parameters
87 public Builder(Consumer c) {
89 //this.fSettings = settings;
91 fLimit = CambriaConstants.kNoTimeout;
92 fTimeoutMs = CambriaConstants.kNoLimit;
93 fTopicFilter = CambriaConstants.kNoFilter;
96 // fOffset = CambriaEvents.kNextOffset;
101 * constructor initializes with limit
104 * only l no of messages will be consumed
107 public Builder limit(int l) {
113 * constructor initializes with timeout
116 * if there is no message to consume, them DMaaP will wait
120 public Builder timeout(int t) {
126 * constructor initializes with filter
132 public Builder filter(String f) {
133 this.fTopicFilter = f;
138 * constructor initializes with boolean value pretty
141 * messages print in new line
144 public Builder pretty(boolean p) {
150 * constructor initializes with boolean value meta
153 * along with messages offset will print
156 public Builder withMeta(boolean withMeta) {
157 fWithMeta = withMeta;
161 // public Builder atOffset ( int pos )
167 * method returs object of CambriaOutboundEventStream
170 * @throws CambriaApiException
172 public CambriaOutboundEventStream build() throws CambriaApiException {
173 return new CambriaOutboundEventStream(this);
177 @SuppressWarnings("unchecked")
181 * @throws CambriaApiException
184 private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
185 fConsumer = builder.fConsumer;
186 fLimit = builder.fLimit;
187 fTimeoutMs = builder.fTimeoutMs;
188 //fSettings = builder.fSettings;
190 fPretty = builder.fPretty;
191 fWithMeta = builder.fWithMeta;
193 // if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
194 // fHpAlarmFilter = null;
198 // final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
199 // HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
200 // fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
201 // final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
202 // fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
203 // } catch (HpReaderException e) {
204 // // JSON was okay, but the filter engine says it's bogus
205 // throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
206 // "Couldn't create filter: " + e.getMessage());
207 // } catch (JSONException e) {
208 // // user sent a bogus JSON object
209 // throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
210 // "Couldn't parse JSON: " + e.getMessage());
217 * interface provides onWait and onMessage methods
220 public interface operation {
223 * @throws IOException
225 void onWait() throws IOException;
227 * provides the output based in the consumer paramter
230 * @throws IOException
232 void onMessage(int count, Message msg) throws IOException;
239 public int getSentCount() {
249 public void write(final OutputStream os) throws IOException {
250 //final boolean transactionEnabled = topic.isTransactionEnabled();
251 //final boolean transactionEnabled = isTransEnabled();
252 final boolean transactionEnabled = istransEnable;
255 fSent = forEachMessage(new operation() {
257 public void onMessage(int count, Message msg) throws IOException, JSONException {
260 JSONObject jsonMessage = null;
261 if (transactionEnabled) {
262 jsonMessage = new JSONObject(msg.getMessage());
263 message = jsonMessage.getString("message");
271 final JSONObject entry = new JSONObject();
272 entry.put("offset", msg.getOffset());
273 entry.put("message", message);
274 os.write(entry.toString().getBytes());
276 //os.write(message.getBytes());
277 String jsonString = "";
278 if(transactionEnabled){
279 jsonString= JSONObject.valueToString(message);
281 jsonString = JSONObject.valueToString (msg.getMessage());
283 os.write ( jsonString.getBytes () );
291 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
292 if (null==metricTopicname)
293 metricTopicname="msgrtr.apinode.metrics.dmaap";
295 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
296 if (transactionEnabled) {
297 final String transactionId = jsonMessage.getString("transactionId");
298 responseTransactionId = transactionId;
300 StringBuilder consumerInfo = new StringBuilder();
301 if (null != dmaapContext && null != dmaapContext.getRequest()) {
302 final HttpServletRequest request = dmaapContext.getRequest();
303 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
304 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
305 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
307 "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
308 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
311 log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId
312 + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]");
321 * It makes thread to wait
322 * @throws IOException
324 public void onWait() throws IOException {
325 os.flush(); // likely totally unnecessary for a network socket
327 // FIXME: would be good to wait/signal
329 } catch (InterruptedException e) {
335 //if (null != dmaapContext && isTransactionEnabled()) {
336 if (null != dmaapContext && istransEnable) {
338 dmaapContext.getResponse().setHeader("transactionId",
339 Utils.getResponseTransactionId(responseTransactionId));
345 boolean close_out_stream = true;
346 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream");
347 if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream);
349 //if (fSettings.getBoolean("close.output.stream", true)) {
350 if (close_out_stream) {
360 private String getConsumerGroupFromRequest(String requestURI) {
361 if (null != requestURI && !requestURI.isEmpty()) {
363 String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
365 int startIndex = consumerDetails.indexOf("/") + 1;
366 int endIndex = consumerDetails.lastIndexOf("/");
367 return consumerDetails.substring(startIndex, endIndex);
375 * @throws IOException
376 * @throws JSONException
378 public int forEachMessage(operation op) throws IOException, JSONException {
379 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
382 boolean firstPing = true;
384 final long startMs = System.currentTimeMillis();
385 final long timeoutMs = fTimeoutMs + startMs;
387 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
393 Consumer.Message msg = null;
394 while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) {
398 // if (topic.isTransactionEnabled() || true) {
400 // As part of DMaaP changes we are wrapping the original
401 // message into a json object
402 // and then this json object is further wrapped into message
403 // object before publishing,
404 // so extracting the original message from the message
405 // object for matching with filter.
406 final JSONObject jsonMessage = new JSONObject(msg.getMessage());
407 message = jsonMessage.getString("message");
409 message = msg.getMessage();
412 // If filters are enabled/set, message should be in JSON format
413 // for filters to work for
414 // otherwise filter will automatically ignore message in
416 if (filterMatches(message)) {
417 op.onMessage(count, msg);
428 * Checks whether filter is initialized
430 // private boolean isFilterInitialized() {
431 // return (fHpAlarmFilter != null && fHppe != null);
439 private boolean filterMatches(String msg) {
440 boolean result = true;
441 // if (isFilterInitialized()) {
443 // final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
444 // result = fHpAlarmFilter.matches(fHppe, e);
445 // } catch (JSONException x) {
446 // // the msg may not be JSON
448 // log.error("Failed due to " + x.getMessage());
449 // } catch (Exception x) {
450 // log.error("Error using filter: " + x.getMessage(), x);
457 public DMaaPContext getDmaapContext() {
461 public void setDmaapContext(DMaaPContext dmaapContext) {
462 this.dmaapContext = dmaapContext;
465 public Topic getTopic() {
469 public void setTopic(Topic topic) {
473 public void setTopicStyle(boolean aaftopic) {
474 this.isAAFTopic = aaftopic;
477 public void setTransEnabled ( boolean transEnable) {
478 this.istransEnable = transEnable;
481 /*private boolean isTransactionEnabled() {
482 //return topic.isTransactionEnabled();
483 return true; // let metrics creates for all the topics
486 private boolean isTransEnabled() {
487 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
488 boolean istransidreqd=false;
489 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){
490 istransidreqd = true;
493 return istransidreqd;
497 private final Consumer fConsumer;
498 private final int fLimit;
499 private final int fTimeoutMs;
500 //private final rrNvReadable fSettings;
501 private final boolean fPretty;
502 private final boolean fWithMeta;
504 // private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
505 // private final HpProcessingEngine<HpJsonEvent> fHppe;
506 private DMaaPContext dmaapContext;
507 private String responseTransactionId;
509 private boolean isAAFTopic = false;
510 private boolean istransEnable = false;
513 //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class);
515 private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);