update the testcases after the kafka 11 changes
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / resources / CambriaOutboundEventStream.java
@@ -8,49 +8,57 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+*  
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *
+ *  
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
-package com.att.nsa.cambria.resources;
+package com.att.dmf.mr.resources;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Date;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.JSONTokener;
 
 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.backends.Consumer;
-import com.att.nsa.cambria.backends.Consumer.Message;
-import com.att.nsa.cambria.beans.DMaaPContext;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.metabroker.Topic;
-import com.att.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter;
-import com.att.nsa.cambria.utils.Utils;
-
-import jline.internal.Log;
-
-
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.backends.Consumer;
+import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metabroker.Topic;
+import com.att.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
+import com.att.dmf.mr.utils.Utils;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+//import com.att.nsa.drumlin.till.nv.rrNvReadable;
+/*import com.att.sa.highlandPark.config.HpConfigContext;
+import com.att.sa.highlandPark.config.HpReaderException;
+import com.att.sa.highlandPark.events.HpJsonEvent;
+import com.att.sa.highlandPark.events.HpJsonEventFactory;
+import com.att.sa.highlandPark.processor.HpAlarmFilter;
+import com.att.sa.highlandPark.processor.HpEvent;
+import com.att.sa.highlandPark.processor.HpProcessingEngine;
+import com.att.sa.highlandPark.processor.HpProcessingEngine.EventFactory;
+*/
 /**
  * class used to write the consumed messages
  * 
- * @author author
+ * @author anowarul.islam
  *
  */
 public class CambriaOutboundEventStream implements StreamWriter {
@@ -61,16 +69,16 @@ public class CambriaOutboundEventStream implements StreamWriter {
         * static innerclass it takes all the input parameter for kafka consumer
         * like limit, timeout, meta, pretty
         * 
-        * @author author
+        * @author anowarul.islam
         *
         */
        public static class Builder {
 
                // Required
                private final Consumer fConsumer;
-               //private final rrNvReadable fSettings;   // used during write to tweak
-                                                                                               // format, decide to explicitly
-                                                                                               // close stream or not
+               // private final rrNvReadable fSettings; // used during write to tweak
+               // format, decide to explicitly
+               // close stream or not
 
                // Optional
                private int fLimit;
@@ -78,6 +86,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
                private String fTopicFilter;
                private boolean fPretty;
                private boolean fWithMeta;
+               ArrayList<Consumer> fKafkaConsumerList;
 
                // private int fOffset;
                /**
@@ -88,13 +97,14 @@ public class CambriaOutboundEventStream implements StreamWriter {
                 */
                public Builder(Consumer c) {
                        this.fConsumer = c;
-                       //this.fSettings = settings;
+                       // this.fSettings = settings;
 
                        fLimit = CambriaConstants.kNoTimeout;
                        fTimeoutMs = CambriaConstants.kNoLimit;
                        fTopicFilter = CambriaConstants.kNoFilter;
                        fPretty = false;
                        fWithMeta = false;
+                       //this.fKafkaConsumerList = consList;
                        // fOffset = CambriaEvents.kNextOffset;
                }
 
@@ -187,31 +197,31 @@ public class CambriaOutboundEventStream implements StreamWriter {
                fConsumer = builder.fConsumer;
                fLimit = builder.fLimit;
                fTimeoutMs = builder.fTimeoutMs;
-               //fSettings = builder.fSettings;
+               // fSettings = builder.fSettings;
                fSent = 0;
                fPretty = builder.fPretty;
                fWithMeta = builder.fWithMeta;
-               
-//             if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
-//                     fHpAlarmFilter = null;
-//                     fHppe = null;
-//             } else {
-//                     try {
-//                             final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
-//                             HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
-//                             fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
-//                             final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
-//                             fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
-//                     } catch (HpReaderException e) {
-//                             // JSON was okay, but the filter engine says it's bogus
-//                             throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
-//                                             "Couldn't create filter: " + e.getMessage());
-//                     } catch (JSONException e) {
-//                             // user sent a bogus JSON object
-//                             throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
-//                                             "Couldn't parse JSON: " + e.getMessage());
-//                     }
-//             }
+               fKafkaConsumerList = builder.fKafkaConsumerList;
+       /*      if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
+                       fHpAlarmFilter = null;
+                       fHppe = null;
+               } else {
+                       try {
+                               final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
+                               HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
+                               fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
+                               final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
+                               fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
+                       } catch (HpReaderException e) {
+                               // JSON was okay, but the filter engine says it's bogus
+                               throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
+                                               "Couldn't create filter: " + e.getMessage());
+                       } catch (JSONException e) {
+                               // user sent a bogus JSON object
+                               throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
+                                               "Couldn't parse JSON: " + e.getMessage());
+                       }
+               }*/
        }
 
        /**
@@ -222,16 +232,20 @@ public class CambriaOutboundEventStream implements StreamWriter {
        public interface operation {
                /**
                 * Call thread.sleep
+                * 
                 * @throws IOException
                 */
                void onWait() throws IOException;
-/**
- * provides the output based in the consumer paramter
- * @param count
- * @param msg
- * @throws IOException
- */
-               void onMessage(int count, Message msg) throws IOException;
+
+               /**
+                * provides the output based in the consumer paramter
+                * 
+                * @param count
+                * @param msg
+                * @throws IOException
+                */
+               // void onMessage(int count, Message msg) throws IOException;
+               void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
        }
 
        /**
@@ -246,74 +260,63 @@ public class CambriaOutboundEventStream implements StreamWriter {
        /**
         * 
         * @param os
-        * throws IOException
+        *            throws IOException
         */
        public void write(final OutputStream os) throws IOException {
-               //final boolean transactionEnabled = topic.isTransactionEnabled();
-               //final boolean transactionEnabled = isTransEnabled();
-               final boolean transactionEnabled = istransEnable;
+               // final boolean transactionEnabled = topic.isTransactionEnabled();
+               // final boolean transactionEnabled = isTransEnabled();
+               // final boolean transactionEnabled = istransEnable;
+               // synchronized(this){
                os.write('[');
-
                fSent = forEachMessage(new operation() {
                        @Override
-                       public void onMessage(int count, Message msg) throws IOException, JSONException {
-
-                               String message = "";
-                               JSONObject jsonMessage = null;
-                               if (transactionEnabled) {
-                                       jsonMessage = new JSONObject(msg.getMessage());
-                                       message = jsonMessage.getString("message");
-                               }
+                       public void onMessage(int count, String msg, String transId, long offSet)
+                                       throws IOException, JSONException {
 
                                if (count > 0) {
                                        os.write(',');
                                }
-
                                if (fWithMeta) {
                                        final JSONObject entry = new JSONObject();
-                                       entry.put("offset", msg.getOffset());
-                                       entry.put("message", message);
+                                       entry.put("offset", offSet);
+                                       entry.put("message", msg);
                                        os.write(entry.toString().getBytes());
                                } else {
-                                       //os.write(message.getBytes());
-                                        String jsonString = "";
-                                       if(transactionEnabled){
-                                               jsonString= JSONObject.valueToString(message);
-                                       }else{
-                                               jsonString = JSONObject.valueToString (msg.getMessage());
-                                               }
-                                       os.write ( jsonString.getBytes () );
+                                       // os.write(message.getBytes());
+                                               String jsonString = JSONObject.valueToString(msg);
+                                       os.write(jsonString.getBytes());
                                }
 
                                if (fPretty) {
                                        os.write('\n');
                                }
 
-                               
-                               String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
-                               if (null==metricTopicname)
-                         metricTopicname="msgrtr.apinode.metrics.dmaap";
-                
-                if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
-                               if (transactionEnabled) {
-                                       final String transactionId = jsonMessage.getString("transactionId");
-                                       responseTransactionId = transactionId;
-
-                                       StringBuilder consumerInfo = new StringBuilder();
-                                       if (null != dmaapContext && null != dmaapContext.getRequest()) {
-                                               final HttpServletRequest request = dmaapContext.getRequest();
-                                               consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
-                                               consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
-                                               consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
-                                               consumerInfo.append(
-                                                               "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
-                                               consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
+                               String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap
+                                               .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
+                               if (null == metricTopicname)
+                                       metricTopicname = "msgrtr.apinode.metrics.dmaap";
+                               if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
+                                       try {
+                                               if (istransEnable && istransType) {
+                                                       // final String transactionId =
+                                                       // jsonMessage.getString("transactionId");
+                                                       // responseTransactionId = transId;
+                                                       StringBuilder consumerInfo = new StringBuilder();
+                                                       if (null != dmaapContext && null != dmaapContext.getRequest()) {
+                                                               final HttpServletRequest request = dmaapContext.getRequest();
+                                                               consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
+                                                               consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
+                                                               consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
+                                                               consumerInfo.append("consumerGroup= \""
+                                                                               + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
+                                                               consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
+                                                       }
+                                                       log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
+                                                                       + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
+                                               }
+                                       } catch (Exception e) {
                                        }
-
-                                       log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId
-                                                       + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]");
                                }
-                }
 
                        }
 
@@ -321,6 +324,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
                        /**
                         * 
                         * It makes thread to wait
+                        * 
                         * @throws IOException
                         */
                        public void onWait() throws IOException {
@@ -329,15 +333,14 @@ public class CambriaOutboundEventStream implements StreamWriter {
                                        // FIXME: would be good to wait/signal
                                        Thread.sleep(100);
                                } catch (InterruptedException e) {
-                                       Log.error(e.toString());
-                               Thread.currentThread().interrupt();     
+                                       // ignore
                                }
                        }
                });
 
-               //if (null != dmaapContext && isTransactionEnabled()) {
-                       if (null != dmaapContext && istransEnable) {
-                       
+               // if (null != dmaapContext && isTransactionEnabled()) {
+               if (null != dmaapContext && istransEnable && istransType) {
+
                        dmaapContext.getResponse().setHeader("transactionId",
                                        Utils.getResponseTransactionId(responseTransactionId));
                }
@@ -346,12 +349,14 @@ public class CambriaOutboundEventStream implements StreamWriter {
                os.flush();
 
                boolean close_out_stream = true;
-               String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream");
-               if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream);
-               
-               //if (fSettings.getBoolean("close.output.stream", true)) {
-                               if (close_out_stream) {
+               String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
+               if (null != strclose_out_stream)
+                       close_out_stream = Boolean.parseBoolean(strclose_out_stream);
+
+               // if (fSettings.getBoolean("close.output.stream", true)) {
+               if (close_out_stream) {
                        os.close();
+                       // }
                }
        }
 
@@ -371,21 +376,22 @@ public class CambriaOutboundEventStream implements StreamWriter {
                }
                return null;
        }
-/**
- * 
- * @param op
- * @return
- * @throws IOException
- * @throws JSONException 
- */
+
+       /**
+        * 
+        * @param op
+        * @return
+        * @throws IOException
+        * @throws JSONException
+        */
        public int forEachMessage(operation op) throws IOException, JSONException {
                final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
 
                int count = 0;
                boolean firstPing = true;
-
+               // boolean isTransType=false;
                final long startMs = System.currentTimeMillis();
-               final long timeoutMs = fTimeoutMs + startMs;
+               final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll 
 
                while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
                        if (!firstPing) {
@@ -393,23 +399,63 @@ public class CambriaOutboundEventStream implements StreamWriter {
                        }
                        firstPing = false;
 
-                       Consumer.Message msg = null;
-                       while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) {
+               
+                                Consumer.Message msgRecord = null;
+                                while (count < effectiveLimit && (msgRecord =
+                                fConsumer.nextMessage()) != null) {
 
-                               
                                String message = "";
-                       //      if (topic.isTransactionEnabled() || true) {
-                               if (istransEnable) {
-                                       // As part of DMaaP changes we are wrapping the original
-                                       // message into a json object
-                                       // and then this json object is further wrapped into message
-                                       // object before publishing,
-                                       // so extracting the original message from the message
-                                       // object for matching with filter.
-                                       final JSONObject jsonMessage = new JSONObject(msg.getMessage());
-                                       message = jsonMessage.getString("message");
-                               } else {
-                                       message = msg.getMessage();
+                               String transactionid = "";
+                               try {
+                   // String msgRecord = msg;
+                                       JSONObject jsonMessage = new JSONObject(msgRecord);
+                                       String[] keys = JSONObject.getNames(jsonMessage);
+                                       boolean wrapheader1 = false;
+                                       boolean wrapheader2 = false;
+                                       boolean found_attr3 = false;
+                                       String wrapElement1 = "message";
+                                       String wrapElement2 = "msgWrapMR";
+                                       String transIdElement = "transactionId";
+                                       if (null != keys) {
+                                               for (String key : keys) {
+                                                       if (key.equals(wrapElement1)) {
+                                                               wrapheader1 = true;
+                                                       } else if (key.equals(wrapElement2)) {
+                                                               wrapheader2 = true;
+                                                       } else if (key.equals(transIdElement)) {
+                                                               found_attr3 = true;
+                                                               transactionid = jsonMessage.getString(key);
+                                                       }
+                                               }
+                                       }
+
+                                       // returns contents of attribute 1 if both attributes
+                                       // present, otherwise
+                                       // the whole msg
+                                       if (wrapheader2 && found_attr3) {
+                                               message = jsonMessage.getString(wrapElement2);
+                                       } else if (wrapheader1 && found_attr3) {
+                                               message = jsonMessage.getString(wrapElement1);
+                                       } else {
+                                               message = msgRecord.getMessage();
+                                       }
+                                       // jsonMessage = extractMessage(jsonMessage ,
+                                       // "message","msgWrapMR","transactionId");
+                                       istransType = true;
+                               } catch (JSONException e) { // This check is required for the
+                                                                                       // message sent by MR AAF flow but
+                                                                                       // consumed by UEB ACL flow which
+                                                                                       // wont expect transaction id in
+                                                                                       // cambria client api
+                                       // Ignore
+                                       log.info("JSON Exception logged when the message is non JSON Format");
+                               } catch (Exception exp) {
+                                       log.info("****Some Exception occured for writing messages in topic" + topic.getName()
+                                                       + "  Exception" + exp);
+                               }
+                               if (message == null || message.equals("")) {
+                                       istransType = false;
+                                       message = msgRecord.getMessage();
                                }
 
                                // If filters are enabled/set, message should be in JSON format
@@ -417,23 +463,26 @@ public class CambriaOutboundEventStream implements StreamWriter {
                                // otherwise filter will automatically ignore message in
                                // non-json format.
                                if (filterMatches(message)) {
-                                       op.onMessage(count, msg);
+                                       op.onMessage(count, message, transactionid, msgRecord.getOffset());
                                        count++;
+
                                }
+
                        }
                }
-
                return count;
        }
 
+       
+
        /**
         * 
         * Checks whether filter is initialized
         */
-//     private boolean isFilterInitialized() {
-//             return (fHpAlarmFilter != null && fHppe != null);
-//     }
-
+       /*private boolean isFilterInitialized() {
+               return (fHpAlarmFilter != null && fHppe != null);
+       }
+*/
        /**
         * 
         * @param msg
@@ -441,18 +490,18 @@ public class CambriaOutboundEventStream implements StreamWriter {
         */
        private boolean filterMatches(String msg) {
                boolean result = true;
-//             if (isFilterInitialized()) {
-//                     try {
-//                             final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
-//                             result = fHpAlarmFilter.matches(fHppe, e);
-//                     } catch (JSONException x) {
-//                             // the msg may not be JSON
-//                             result = false;
-//                             log.error("Failed due to " + x.getMessage());
-//                     } catch (Exception x) {
-//                             log.error("Error using filter: " + x.getMessage(), x);
-//                     }
-//             }
+               /*if (isFilterInitialized()) {
+                       try {
+                               final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
+                               result = fHpAlarmFilter.matches(fHppe, e);
+                       } catch (JSONException x) {
+                               // the msg may not be JSON
+                               result = false;
+                               log.error("Failed due to " + x.getMessage());
+                       } catch (Exception x) {
+                               log.error("Error using filter: " + x.getMessage(), x);
+                       }
+               }*/
 
                return result;
        }
@@ -472,48 +521,34 @@ public class CambriaOutboundEventStream implements StreamWriter {
        public void setTopic(Topic topic) {
                this.topic = topic;
        }
-       
+
        public void setTopicStyle(boolean aaftopic) {
                this.isAAFTopic = aaftopic;
        }
-       
-       public void setTransEnabled ( boolean transEnable) {
+
+       public void setTransEnabled(boolean transEnable) {
                this.istransEnable = transEnable;
        }
 
-       /*private boolean isTransactionEnabled() {
-               //return topic.isTransactionEnabled();
-               return true; // let metrics creates for all the topics
-       }*/
-
-       private boolean isTransEnabled() {
-               String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
-               boolean istransidreqd=false;
-               if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){
-                       istransidreqd = true; 
-               }
-               
-               return istransidreqd;
-
-       }
        
        private final Consumer fConsumer;
        private final int fLimit;
        private final int fTimeoutMs;
-       //private final rrNvReadable fSettings;
+       // private final rrNvReadable fSettings;
        private final boolean fPretty;
        private final boolean fWithMeta;
        private int fSent;
 //     private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
-//     private final HpProcessingEngine<HpJsonEvent> fHppe;
+       //private final HpProcessingEngine<HpJsonEvent> fHppe;
        private DMaaPContext dmaapContext;
        private String responseTransactionId;
        private Topic topic;
        private boolean isAAFTopic = false;
        private boolean istransEnable = false;
-       
+       private ArrayList<Consumer> fKafkaConsumerList;
+       private boolean istransType = true;
+       // private static final Logger log =
+       // Logger.getLogger(CambriaOutboundEventStream.class);
 
-       //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class);
-       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
 }
\ No newline at end of file