1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.resources;
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.Date;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
31 import com.att.eelf.configuration.EELFLogger;
32 import com.att.eelf.configuration.EELFManager;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.json.JSONTokener;
37 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
38 import com.att.nsa.cambria.CambriaApiException;
39 import com.att.nsa.cambria.backends.Consumer;
40 import com.att.nsa.cambria.backends.Consumer.Message;
41 import com.att.nsa.cambria.beans.DMaaPContext;
42 import com.att.nsa.cambria.constants.CambriaConstants;
43 import com.att.nsa.cambria.metabroker.Topic;
44 import com.att.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter;
45 import com.att.nsa.cambria.utils.Utils;
47 import jline.internal.Log;
51 * class used to write the consumed messages
56 public class CambriaOutboundEventStream implements StreamWriter {
57 private static final int kTopLimit = 1024 * 4;
61 * static innerclass it takes all the input parameter for kafka consumer
62 * like limit, timeout, meta, pretty
67 public static class Builder {
70 private final Consumer fConsumer;
71 //private final rrNvReadable fSettings; // used during write to tweak
72 // format, decide to explicitly
73 // close stream or not
77 private int fTimeoutMs;
78 private String fTopicFilter;
79 private boolean fPretty;
80 private boolean fWithMeta;
82 // private int fOffset;
84 * constructor it initializes all the consumer parameters
89 public Builder(Consumer c) {
91 //this.fSettings = settings;
93 fLimit = CambriaConstants.kNoTimeout;
94 fTimeoutMs = CambriaConstants.kNoLimit;
95 fTopicFilter = CambriaConstants.kNoFilter;
98 // fOffset = CambriaEvents.kNextOffset;
103 * constructor initializes with limit
106 * only l no of messages will be consumed
109 public Builder limit(int l) {
115 * constructor initializes with timeout
118 * if there is no message to consume, them DMaaP will wait
122 public Builder timeout(int t) {
128 * constructor initializes with filter
134 public Builder filter(String f) {
135 this.fTopicFilter = f;
140 * constructor initializes with boolean value pretty
143 * messages print in new line
146 public Builder pretty(boolean p) {
152 * constructor initializes with boolean value meta
155 * along with messages offset will print
158 public Builder withMeta(boolean withMeta) {
159 fWithMeta = withMeta;
163 // public Builder atOffset ( int pos )
169 * method returs object of CambriaOutboundEventStream
172 * @throws CambriaApiException
174 public CambriaOutboundEventStream build() throws CambriaApiException {
175 return new CambriaOutboundEventStream(this);
179 @SuppressWarnings("unchecked")
183 * @throws CambriaApiException
186 private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
187 fConsumer = builder.fConsumer;
188 fLimit = builder.fLimit;
189 fTimeoutMs = builder.fTimeoutMs;
190 //fSettings = builder.fSettings;
192 fPretty = builder.fPretty;
193 fWithMeta = builder.fWithMeta;
195 // if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
196 // fHpAlarmFilter = null;
200 // final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
201 // HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
202 // fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
203 // final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
204 // fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
205 // } catch (HpReaderException e) {
206 // // JSON was okay, but the filter engine says it's bogus
207 // throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
208 // "Couldn't create filter: " + e.getMessage());
209 // } catch (JSONException e) {
210 // // user sent a bogus JSON object
211 // throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
212 // "Couldn't parse JSON: " + e.getMessage());
219 * interface provides onWait and onMessage methods
222 public interface operation {
225 * @throws IOException
227 void onWait() throws IOException;
229 * provides the output based in the consumer paramter
232 * @throws IOException
234 void onMessage(int count, Message msg) throws IOException;
241 public int getSentCount() {
251 public void write(final OutputStream os) throws IOException {
252 //final boolean transactionEnabled = topic.isTransactionEnabled();
253 //final boolean transactionEnabled = isTransEnabled();
254 final boolean transactionEnabled = istransEnable;
257 fSent = forEachMessage(new operation() {
259 public void onMessage(int count, Message msg) throws IOException, JSONException {
262 JSONObject jsonMessage = null;
263 if (transactionEnabled) {
264 jsonMessage = new JSONObject(msg.getMessage());
265 message = jsonMessage.getString("message");
273 final JSONObject entry = new JSONObject();
274 entry.put("offset", msg.getOffset());
275 entry.put("message", message);
276 os.write(entry.toString().getBytes());
278 //os.write(message.getBytes());
279 String jsonString = "";
280 if(transactionEnabled){
281 jsonString= JSONObject.valueToString(message);
283 jsonString = JSONObject.valueToString (msg.getMessage());
285 os.write ( jsonString.getBytes () );
293 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
294 if (null==metricTopicname)
295 metricTopicname="msgrtr.apinode.metrics.dmaap";
297 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
298 if (transactionEnabled) {
299 final String transactionId = jsonMessage.getString("transactionId");
300 responseTransactionId = transactionId;
302 StringBuilder consumerInfo = new StringBuilder();
303 if (null != dmaapContext && null != dmaapContext.getRequest()) {
304 final HttpServletRequest request = dmaapContext.getRequest();
305 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
306 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
307 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
309 "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
310 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
313 log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId
314 + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]");
323 * It makes thread to wait
324 * @throws IOException
326 public void onWait() throws IOException {
327 os.flush(); // likely totally unnecessary for a network socket
329 // FIXME: would be good to wait/signal
331 } catch (InterruptedException e) {
332 Log.error(e.toString());
333 Thread.currentThread().interrupt();
338 //if (null != dmaapContext && isTransactionEnabled()) {
339 if (null != dmaapContext && istransEnable) {
341 dmaapContext.getResponse().setHeader("transactionId",
342 Utils.getResponseTransactionId(responseTransactionId));
348 boolean close_out_stream = true;
349 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream");
350 if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream);
352 //if (fSettings.getBoolean("close.output.stream", true)) {
353 if (close_out_stream) {
363 private String getConsumerGroupFromRequest(String requestURI) {
364 if (null != requestURI && !requestURI.isEmpty()) {
366 String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
368 int startIndex = consumerDetails.indexOf("/") + 1;
369 int endIndex = consumerDetails.lastIndexOf("/");
370 return consumerDetails.substring(startIndex, endIndex);
378 * @throws IOException
379 * @throws JSONException
381 public int forEachMessage(operation op) throws IOException, JSONException {
382 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
385 boolean firstPing = true;
387 final long startMs = System.currentTimeMillis();
388 final long timeoutMs = fTimeoutMs + startMs;
390 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
396 Consumer.Message msg = null;
397 while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) {
401 // if (topic.isTransactionEnabled() || true) {
403 // As part of DMaaP changes we are wrapping the original
404 // message into a json object
405 // and then this json object is further wrapped into message
406 // object before publishing,
407 // so extracting the original message from the message
408 // object for matching with filter.
409 final JSONObject jsonMessage = new JSONObject(msg.getMessage());
410 message = jsonMessage.getString("message");
412 message = msg.getMessage();
415 // If filters are enabled/set, message should be in JSON format
416 // for filters to work for
417 // otherwise filter will automatically ignore message in
419 if (filterMatches(message)) {
420 op.onMessage(count, msg);
431 * Checks whether filter is initialized
433 // private boolean isFilterInitialized() {
434 // return (fHpAlarmFilter != null && fHppe != null);
442 private boolean filterMatches(String msg) {
443 boolean result = true;
444 // if (isFilterInitialized()) {
446 // final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
447 // result = fHpAlarmFilter.matches(fHppe, e);
448 // } catch (JSONException x) {
449 // // the msg may not be JSON
451 // log.error("Failed due to " + x.getMessage());
452 // } catch (Exception x) {
453 // log.error("Error using filter: " + x.getMessage(), x);
460 public DMaaPContext getDmaapContext() {
464 public void setDmaapContext(DMaaPContext dmaapContext) {
465 this.dmaapContext = dmaapContext;
468 public Topic getTopic() {
472 public void setTopic(Topic topic) {
476 public void setTopicStyle(boolean aaftopic) {
477 this.isAAFTopic = aaftopic;
480 public void setTransEnabled ( boolean transEnable) {
481 this.istransEnable = transEnable;
484 /*private boolean isTransactionEnabled() {
485 //return topic.isTransactionEnabled();
486 return true; // let metrics creates for all the topics
489 private boolean isTransEnabled() {
490 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
491 boolean istransidreqd=false;
492 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){
493 istransidreqd = true;
496 return istransidreqd;
500 private final Consumer fConsumer;
501 private final int fLimit;
502 private final int fTimeoutMs;
503 //private final rrNvReadable fSettings;
504 private final boolean fPretty;
505 private final boolean fWithMeta;
507 // private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
508 // private final HpProcessingEngine<HpJsonEvent> fHppe;
509 private DMaaPContext dmaapContext;
510 private String responseTransactionId;
512 private boolean isAAFTopic = false;
513 private boolean istransEnable = false;
516 //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class);
518 private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);