X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fdmf%2Fmr%2Fresources%2FCambriaOutboundEventStream.java;fp=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2Fcambria%2Fresources%2FCambriaOutboundEventStream.java;h=837c956754aef398650b1685f518b57e4262393b;hb=b32effcaf5684d5e2f338a4537b71a2375c534e5;hp=7366ddebe0f44f49fdd62d94cfe6b69179ca6fe2;hpb=0823cb186012c8e6b7de3d979dfabb9f838da7c2;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java similarity index 50% rename from src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java rename to src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java index 7366dde..837c956 100644 --- a/src/main/java/com/att/nsa/cambria/resources/CambriaOutboundEventStream.java +++ b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java @@ -8,49 +8,57 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ -package com.att.nsa.cambria.resources; +package com.att.dmf.mr.resources; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Date; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONTokener; import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.Consumer.Message; -import com.att.nsa.cambria.beans.DMaaPContext; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.metabroker.Topic; -import com.att.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter; -import com.att.nsa.cambria.utils.Utils; - -import jline.internal.Log; - - +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.backends.Consumer; +import com.att.dmf.mr.beans.DMaaPContext; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.metabroker.Topic; +import com.att.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter; +import com.att.dmf.mr.utils.Utils; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.drumlin.till.nv.rrNvReadable; +/*import com.att.sa.highlandPark.config.HpConfigContext; +import com.att.sa.highlandPark.config.HpReaderException; +import com.att.sa.highlandPark.events.HpJsonEvent; +import com.att.sa.highlandPark.events.HpJsonEventFactory; +import com.att.sa.highlandPark.processor.HpAlarmFilter; +import com.att.sa.highlandPark.processor.HpEvent; +import com.att.sa.highlandPark.processor.HpProcessingEngine; +import com.att.sa.highlandPark.processor.HpProcessingEngine.EventFactory; +*/ /** * class used to write the consumed messages * - * @author author + * @author anowarul.islam * */ public class CambriaOutboundEventStream implements StreamWriter { @@ -61,16 +69,16 @@ public class CambriaOutboundEventStream implements StreamWriter { * static innerclass it takes all the input parameter for kafka consumer * like limit, timeout, meta, pretty * - * @author author + * @author anowarul.islam * */ public static class Builder { // Required private final Consumer fConsumer; - //private final rrNvReadable fSettings; // used during write to tweak - // format, decide to explicitly - // close stream or not + // private final rrNvReadable fSettings; // used during write to tweak + // format, decide to explicitly + // close stream or not // Optional private int fLimit; @@ -78,6 +86,7 @@ public class CambriaOutboundEventStream implements StreamWriter { private String fTopicFilter; private boolean fPretty; private boolean fWithMeta; + ArrayList fKafkaConsumerList; // private int fOffset; /** @@ -88,13 +97,14 @@ public class CambriaOutboundEventStream implements StreamWriter { */ public Builder(Consumer c) { this.fConsumer = c; - //this.fSettings = settings; + // this.fSettings = settings; fLimit = CambriaConstants.kNoTimeout; fTimeoutMs = CambriaConstants.kNoLimit; fTopicFilter = CambriaConstants.kNoFilter; fPretty = false; fWithMeta = false; + //this.fKafkaConsumerList = consList; // fOffset = CambriaEvents.kNextOffset; } @@ -187,31 +197,31 @@ public class CambriaOutboundEventStream implements StreamWriter { fConsumer = builder.fConsumer; fLimit = builder.fLimit; fTimeoutMs = builder.fTimeoutMs; - //fSettings = builder.fSettings; + // fSettings = builder.fSettings; fSent = 0; fPretty = builder.fPretty; fWithMeta = builder.fWithMeta; - -// if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) { -// fHpAlarmFilter = null; -// fHppe = null; -// } else { -// try { -// final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter)); -// HpConfigContext cc = new HpConfigContext(); -// fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter); -// final EventFactory ef = new HpJsonEventFactory(); -// fHppe = new HpProcessingEngine(ef); -// } catch (HpReaderException e) { -// // JSON was okay, but the filter engine says it's bogus -// throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, -// "Couldn't create filter: " + e.getMessage()); -// } catch (JSONException e) { -// // user sent a bogus JSON object -// throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, -// "Couldn't parse JSON: " + e.getMessage()); -// } -// } + fKafkaConsumerList = builder.fKafkaConsumerList; + /* if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) { + fHpAlarmFilter = null; + fHppe = null; + } else { + try { + final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter)); + HpConfigContext cc = new HpConfigContext(); + fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter); + final EventFactory ef = new HpJsonEventFactory(); + fHppe = new HpProcessingEngine(ef); + } catch (HpReaderException e) { + // JSON was okay, but the filter engine says it's bogus + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, + "Couldn't create filter: " + e.getMessage()); + } catch (JSONException e) { + // user sent a bogus JSON object + throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, + "Couldn't parse JSON: " + e.getMessage()); + } + }*/ } /** @@ -222,16 +232,20 @@ public class CambriaOutboundEventStream implements StreamWriter { public interface operation { /** * Call thread.sleep + * * @throws IOException */ void onWait() throws IOException; -/** - * provides the output based in the consumer paramter - * @param count - * @param msg - * @throws IOException - */ - void onMessage(int count, Message msg) throws IOException; + + /** + * provides the output based in the consumer paramter + * + * @param count + * @param msg + * @throws IOException + */ + // void onMessage(int count, Message msg) throws IOException; + void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException; } /** @@ -246,74 +260,63 @@ public class CambriaOutboundEventStream implements StreamWriter { /** * * @param os - * throws IOException + * throws IOException */ public void write(final OutputStream os) throws IOException { - //final boolean transactionEnabled = topic.isTransactionEnabled(); - //final boolean transactionEnabled = isTransEnabled(); - final boolean transactionEnabled = istransEnable; + // final boolean transactionEnabled = topic.isTransactionEnabled(); + // final boolean transactionEnabled = isTransEnabled(); + // final boolean transactionEnabled = istransEnable; + // synchronized(this){ os.write('['); - fSent = forEachMessage(new operation() { @Override - public void onMessage(int count, Message msg) throws IOException, JSONException { - - String message = ""; - JSONObject jsonMessage = null; - if (transactionEnabled) { - jsonMessage = new JSONObject(msg.getMessage()); - message = jsonMessage.getString("message"); - } + public void onMessage(int count, String msg, String transId, long offSet) + throws IOException, JSONException { if (count > 0) { os.write(','); } - if (fWithMeta) { final JSONObject entry = new JSONObject(); - entry.put("offset", msg.getOffset()); - entry.put("message", message); + entry.put("offset", offSet); + entry.put("message", msg); os.write(entry.toString().getBytes()); } else { - //os.write(message.getBytes()); - String jsonString = ""; - if(transactionEnabled){ - jsonString= JSONObject.valueToString(message); - }else{ - jsonString = JSONObject.valueToString (msg.getMessage()); - } - os.write ( jsonString.getBytes () ); + // os.write(message.getBytes()); + String jsonString = JSONObject.valueToString(msg); + os.write(jsonString.getBytes()); } if (fPretty) { os.write('\n'); } - - String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); - if (null==metricTopicname) - metricTopicname="msgrtr.apinode.metrics.dmaap"; - - if (!metricTopicname.equalsIgnoreCase(topic.getName())) { - if (transactionEnabled) { - final String transactionId = jsonMessage.getString("transactionId"); - responseTransactionId = transactionId; - - StringBuilder consumerInfo = new StringBuilder(); - if (null != dmaapContext && null != dmaapContext.getRequest()) { - final HttpServletRequest request = dmaapContext.getRequest(); - consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\","); - consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\","); - consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\","); - consumerInfo.append( - "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\","); - consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\","); + String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap + .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic"); + if (null == metricTopicname) + metricTopicname = "msgrtr.apinode.metrics.dmaap"; + if (!metricTopicname.equalsIgnoreCase(topic.getName())) { + try { + if (istransEnable && istransType) { + // final String transactionId = + // jsonMessage.getString("transactionId"); + // responseTransactionId = transId; + StringBuilder consumerInfo = new StringBuilder(); + if (null != dmaapContext && null != dmaapContext.getRequest()) { + final HttpServletRequest request = dmaapContext.getRequest(); + consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\","); + consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\","); + consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\","); + consumerInfo.append("consumerGroup= \"" + + getConsumerGroupFromRequest(request.getRequestURI()) + "\","); + consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\","); + } + log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId + + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]"); + } + } catch (Exception e) { } - - log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId - + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]"); } - } } @@ -321,6 +324,7 @@ public class CambriaOutboundEventStream implements StreamWriter { /** * * It makes thread to wait + * * @throws IOException */ public void onWait() throws IOException { @@ -329,15 +333,14 @@ public class CambriaOutboundEventStream implements StreamWriter { // FIXME: would be good to wait/signal Thread.sleep(100); } catch (InterruptedException e) { - Log.error(e.toString()); - Thread.currentThread().interrupt(); + // ignore } } }); - //if (null != dmaapContext && isTransactionEnabled()) { - if (null != dmaapContext && istransEnable) { - + // if (null != dmaapContext && isTransactionEnabled()) { + if (null != dmaapContext && istransEnable && istransType) { + dmaapContext.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId)); } @@ -346,12 +349,14 @@ public class CambriaOutboundEventStream implements StreamWriter { os.flush(); boolean close_out_stream = true; - String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream"); - if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream); - - //if (fSettings.getBoolean("close.output.stream", true)) { - if (close_out_stream) { + String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream"); + if (null != strclose_out_stream) + close_out_stream = Boolean.parseBoolean(strclose_out_stream); + + // if (fSettings.getBoolean("close.output.stream", true)) { + if (close_out_stream) { os.close(); + // } } } @@ -371,21 +376,22 @@ public class CambriaOutboundEventStream implements StreamWriter { } return null; } -/** - * - * @param op - * @return - * @throws IOException - * @throws JSONException - */ + + /** + * + * @param op + * @return + * @throws IOException + * @throws JSONException + */ public int forEachMessage(operation op) throws IOException, JSONException { final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit); int count = 0; boolean firstPing = true; - + // boolean isTransType=false; final long startMs = System.currentTimeMillis(); - final long timeoutMs = fTimeoutMs + startMs; + final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) { if (!firstPing) { @@ -393,23 +399,63 @@ public class CambriaOutboundEventStream implements StreamWriter { } firstPing = false; - Consumer.Message msg = null; - while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) { + + Consumer.Message msgRecord = null; + while (count < effectiveLimit && (msgRecord = + fConsumer.nextMessage()) != null) { - String message = ""; - // if (topic.isTransactionEnabled() || true) { - if (istransEnable) { - // As part of DMaaP changes we are wrapping the original - // message into a json object - // and then this json object is further wrapped into message - // object before publishing, - // so extracting the original message from the message - // object for matching with filter. - final JSONObject jsonMessage = new JSONObject(msg.getMessage()); - message = jsonMessage.getString("message"); - } else { - message = msg.getMessage(); + String transactionid = ""; + try { + // String msgRecord = msg; + JSONObject jsonMessage = new JSONObject(msgRecord); + String[] keys = JSONObject.getNames(jsonMessage); + boolean wrapheader1 = false; + boolean wrapheader2 = false; + boolean found_attr3 = false; + String wrapElement1 = "message"; + String wrapElement2 = "msgWrapMR"; + String transIdElement = "transactionId"; + if (null != keys) { + for (String key : keys) { + if (key.equals(wrapElement1)) { + wrapheader1 = true; + } else if (key.equals(wrapElement2)) { + wrapheader2 = true; + } else if (key.equals(transIdElement)) { + found_attr3 = true; + transactionid = jsonMessage.getString(key); + } + } + } + + // returns contents of attribute 1 if both attributes + // present, otherwise + // the whole msg + if (wrapheader2 && found_attr3) { + message = jsonMessage.getString(wrapElement2); + } else if (wrapheader1 && found_attr3) { + message = jsonMessage.getString(wrapElement1); + } else { + message = msgRecord.getMessage(); + } + // jsonMessage = extractMessage(jsonMessage , + // "message","msgWrapMR","transactionId"); + istransType = true; + } catch (JSONException e) { // This check is required for the + // message sent by MR AAF flow but + // consumed by UEB ACL flow which + // wont expect transaction id in + // cambria client api + // Ignore + log.info("JSON Exception logged when the message is non JSON Format"); + } catch (Exception exp) { + log.info("****Some Exception occured for writing messages in topic" + topic.getName() + + " Exception" + exp); + } + if (message == null || message.equals("")) { + istransType = false; + message = msgRecord.getMessage(); } // If filters are enabled/set, message should be in JSON format @@ -417,23 +463,26 @@ public class CambriaOutboundEventStream implements StreamWriter { // otherwise filter will automatically ignore message in // non-json format. if (filterMatches(message)) { - op.onMessage(count, msg); + op.onMessage(count, message, transactionid, msgRecord.getOffset()); count++; + } + } } - return count; } + + /** * * Checks whether filter is initialized */ -// private boolean isFilterInitialized() { -// return (fHpAlarmFilter != null && fHppe != null); -// } - + /*private boolean isFilterInitialized() { + return (fHpAlarmFilter != null && fHppe != null); + } +*/ /** * * @param msg @@ -441,18 +490,18 @@ public class CambriaOutboundEventStream implements StreamWriter { */ private boolean filterMatches(String msg) { boolean result = true; -// if (isFilterInitialized()) { -// try { -// final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg)); -// result = fHpAlarmFilter.matches(fHppe, e); -// } catch (JSONException x) { -// // the msg may not be JSON -// result = false; -// log.error("Failed due to " + x.getMessage()); -// } catch (Exception x) { -// log.error("Error using filter: " + x.getMessage(), x); -// } -// } + /*if (isFilterInitialized()) { + try { + final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg)); + result = fHpAlarmFilter.matches(fHppe, e); + } catch (JSONException x) { + // the msg may not be JSON + result = false; + log.error("Failed due to " + x.getMessage()); + } catch (Exception x) { + log.error("Error using filter: " + x.getMessage(), x); + } + }*/ return result; } @@ -472,48 +521,34 @@ public class CambriaOutboundEventStream implements StreamWriter { public void setTopic(Topic topic) { this.topic = topic; } - + public void setTopicStyle(boolean aaftopic) { this.isAAFTopic = aaftopic; } - - public void setTransEnabled ( boolean transEnable) { + + public void setTransEnabled(boolean transEnable) { this.istransEnable = transEnable; } - /*private boolean isTransactionEnabled() { - //return topic.isTransactionEnabled(); - return true; // let metrics creates for all the topics - }*/ - - private boolean isTransEnabled() { - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd"); - boolean istransidreqd=false; - if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){ - istransidreqd = true; - } - - return istransidreqd; - - } private final Consumer fConsumer; private final int fLimit; private final int fTimeoutMs; - //private final rrNvReadable fSettings; + // private final rrNvReadable fSettings; private final boolean fPretty; private final boolean fWithMeta; private int fSent; // private final HpAlarmFilter fHpAlarmFilter; -// private final HpProcessingEngine fHppe; + //private final HpProcessingEngine fHppe; private DMaaPContext dmaapContext; private String responseTransactionId; private Topic topic; private boolean isAAFTopic = false; private boolean istransEnable = false; - + private ArrayList fKafkaConsumerList; + private boolean istransType = true; + // private static final Logger log = + // Logger.getLogger(CambriaOutboundEventStream.class); - //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class); } \ No newline at end of file