8a5c4a9416b33086dcad8d4e335ef9f108c8742d
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / resources / CambriaOutboundEventStream.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.resources;
23
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31
32 import org.apache.kafka.clients.consumer.ConsumerRecord;
33 import org.apache.kafka.clients.consumer.ConsumerRecords;
34 import org.json.JSONException;
35 import org.json.JSONObject;
36 import org.json.JSONTokener;
37
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.dmf.mr.CambriaApiException;
40 import com.att.dmf.mr.backends.Consumer;
41 import com.att.dmf.mr.beans.DMaaPContext;
42 import com.att.dmf.mr.constants.CambriaConstants;
43 import com.att.dmf.mr.metabroker.Topic;
44 import com.att.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
45 import com.att.dmf.mr.utils.Utils;
46 import com.att.eelf.configuration.EELFLogger;
47 import com.att.eelf.configuration.EELFManager;
48 //import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 /*import com.att.sa.highlandPark.config.HpConfigContext;
50 import com.att.sa.highlandPark.config.HpReaderException;
51 import com.att.sa.highlandPark.events.HpJsonEvent;
52 import com.att.sa.highlandPark.events.HpJsonEventFactory;
53 import com.att.sa.highlandPark.processor.HpAlarmFilter;
54 import com.att.sa.highlandPark.processor.HpEvent;
55 import com.att.sa.highlandPark.processor.HpProcessingEngine;
56 import com.att.sa.highlandPark.processor.HpProcessingEngine.EventFactory;
57 */
58 /**
59  * class used to write the consumed messages
60  * 
61  * @author anowarul.islam
62  *
63  */
64 public class CambriaOutboundEventStream implements StreamWriter {
65         private static final int kTopLimit = 1024 * 4;
66
67         /**
68          * 
69          * static innerclass it takes all the input parameter for kafka consumer
70          * like limit, timeout, meta, pretty
71          * 
72          * @author anowarul.islam
73          *
74          */
75         public static class Builder {
76
77                 // Required
78                 private final Consumer fConsumer;
79                 // private final rrNvReadable fSettings; // used during write to tweak
80                 // format, decide to explicitly
81                 // close stream or not
82
83                 // Optional
84                 private int fLimit;
85                 private int fTimeoutMs;
86                 private String fTopicFilter;
87                 private boolean fPretty;
88                 private boolean fWithMeta;
89                 ArrayList<Consumer> fKafkaConsumerList;
90
91                 // private int fOffset;
92                 /**
93                  * constructor it initializes all the consumer parameters
94                  * 
95                  * @param c
96                  * @param settings
97                  */
98                 public Builder(Consumer c) {
99                         this.fConsumer = c;
100                         // this.fSettings = settings;
101
102                         fLimit = CambriaConstants.kNoTimeout;
103                         fTimeoutMs = CambriaConstants.kNoLimit;
104                         fTopicFilter = CambriaConstants.kNoFilter;
105                         fPretty = false;
106                         fWithMeta = false;
107                         //this.fKafkaConsumerList = consList;
108                         // fOffset = CambriaEvents.kNextOffset;
109                 }
110
111                 /**
112                  * 
113                  * constructor initializes with limit
114                  * 
115                  * @param l
116                  *            only l no of messages will be consumed
117                  * @return
118                  */
119                 public Builder limit(int l) {
120                         this.fLimit = l;
121                         return this;
122                 }
123
124                 /**
125                  * constructor initializes with timeout
126                  * 
127                  * @param t
128                  *            if there is no message to consume, them DMaaP will wait
129                  *            for t time
130                  * @return
131                  */
132                 public Builder timeout(int t) {
133                         this.fTimeoutMs = t;
134                         return this;
135                 }
136
137                 /**
138                  * constructor initializes with filter
139                  * 
140                  * @param f
141                  *            filter
142                  * @return
143                  */
144                 public Builder filter(String f) {
145                         this.fTopicFilter = f;
146                         return this;
147                 }
148
149                 /**
150                  * constructor initializes with boolean value pretty
151                  * 
152                  * @param p
153                  *            messages print in new line
154                  * @return
155                  */
156                 public Builder pretty(boolean p) {
157                         fPretty = p;
158                         return this;
159                 }
160
161                 /**
162                  * constructor initializes with boolean value meta
163                  * 
164                  * @param withMeta,
165                  *            along with messages offset will print
166                  * @return
167                  */
168                 public Builder withMeta(boolean withMeta) {
169                         fWithMeta = withMeta;
170                         return this;
171                 }
172
173                 // public Builder atOffset ( int pos )
174                 
175                 // fOffset = pos;
176                 // return this;
177                 // }
178                 /**
179                  * method returs object of CambriaOutboundEventStream
180                  * 
181                  * @return
182                  * @throws CambriaApiException
183                  */
184                 public CambriaOutboundEventStream build() throws CambriaApiException {
185                         return new CambriaOutboundEventStream(this);
186                 }
187         }
188
189         @SuppressWarnings("unchecked")
190         /**
191          * 
192          * @param builder
193          * @throws CambriaApiException
194          * 
195          */
196         private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
197                 fConsumer = builder.fConsumer;
198                 fLimit = builder.fLimit;
199                 fTimeoutMs = builder.fTimeoutMs;
200                 
201                 fSent = 0;
202                 fPretty = builder.fPretty;
203                 fWithMeta = builder.fWithMeta;
204                 fKafkaConsumerList = builder.fKafkaConsumerList;
205         /*      if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
206                         fHpAlarmFilter = null;
207                         fHppe = null;
208                 } else {
209                         try {
210                                 final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
211                                 HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
212                                 fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
213                                 final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
214                                 fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
215                         } catch (HpReaderException e) {
216                                 // JSON was okay, but the filter engine says it's bogus
217                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
218                                                 "Couldn't create filter: " + e.getMessage());
219                         } catch (JSONException e) {
220                                 // user sent a bogus JSON object
221                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
222                                                 "Couldn't parse JSON: " + e.getMessage());
223                         }
224                 }*/
225         }
226
227         /**
228          * 
229          * interface provides onWait and onMessage methods
230          *
231          */
232         public interface operation {
233                 /**
234                  * Call thread.sleep
235                  * 
236                  * @throws IOException
237                  */
238                 void onWait() throws IOException;
239
240                 /**
241                  * provides the output based in the consumer paramter
242                  * 
243                  * @param count
244                  * @param msg
245                  * @throws IOException
246                  */
247                 
248                 void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
249         }
250
251         /**
252          * 
253          * @return
254          */
255         public int getSentCount() {
256                 return fSent;
257         }
258
259         @Override
260         /**
261          * 
262          * @param os
263          *            throws IOException
264          */
265         public void write(final OutputStream os) throws IOException {
266                 
267                 // final boolean transactionEnabled = isTransEnabled();
268                 // final boolean transactionEnabled = istransEnable;
269                 // synchronized(this){
270                 os.write('[');
271                 fSent = forEachMessage(new operation() {
272                         @Override
273                         public void onMessage(int count, String msg, String transId, long offSet)
274                                         throws IOException, JSONException {
275
276                                 if (count > 0) {
277                                         os.write(',');
278                                 }
279                                 if (fWithMeta) {
280                                         final JSONObject entry = new JSONObject();
281                                         entry.put("offset", offSet);
282                                         entry.put("message", msg);
283                                         os.write(entry.toString().getBytes());
284                                 } else {
285                                         
286                                                 String jsonString = JSONObject.valueToString(msg);
287                                         os.write(jsonString.getBytes());
288                                 }
289
290                                 if (fPretty) {
291                                         os.write('\n');
292                                 }
293
294                                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap
295                                                 .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
296                                 if (null == metricTopicname)
297                                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
298                                 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
299                                         try {
300                                                 if (istransEnable && istransType) {
301                                                         // final String transactionId =
302                                                         
303                                                         // responseTransactionId = transId;
304                                                         StringBuilder consumerInfo = new StringBuilder();
305                                                         if (null != dmaapContext && null != dmaapContext.getRequest()) {
306                                                                 final HttpServletRequest request = dmaapContext.getRequest();
307                                                                 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
308                                                                 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
309                                                                 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
310                                                                 consumerInfo.append("consumerGroup= \""
311                                                                                 + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
312                                                                 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
313                                                         }
314                                                         log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
315                                                                         + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
316                                                 }
317                                         } catch (Exception e) {
318                                         }
319                                 }
320
321                         }
322
323                         @Override
324                         /**
325                          * 
326                          * It makes thread to wait
327                          * 
328                          * @throws IOException
329                          */
330                         public void onWait() throws IOException {
331                                 os.flush(); // likely totally unnecessary for a network socket
332                                 try {
333                                         // FIXME: would be good to wait/signal
334                                         Thread.sleep(100);
335                                 } catch (InterruptedException e) {
336                                         // ignore
337                                 }
338                         }
339                 });
340
341                 // if (null != dmaapContext && isTransactionEnabled()) {
342                 if (null != dmaapContext && istransEnable && istransType) {
343
344                         dmaapContext.getResponse().setHeader("transactionId",
345                                         Utils.getResponseTransactionId(responseTransactionId));
346                 }
347
348                 os.write(']');
349                 os.flush();
350
351                 boolean close_out_stream = true;
352                 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
353                 if (null != strclose_out_stream)
354                         close_out_stream = Boolean.parseBoolean(strclose_out_stream);
355
356                 // if (fSettings.getBoolean("close.output.stream", true)) {
357                 if (close_out_stream) {
358                         os.close();
359                         // }
360                 }
361         }
362
363         /**
364          * 
365          * @param requestURI
366          * @return
367          */
368         private String getConsumerGroupFromRequest(String requestURI) {
369                 if (null != requestURI && !requestURI.isEmpty()) {
370
371                         String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
372
373                         int startIndex = consumerDetails.indexOf("/") + 1;
374                         int endIndex = consumerDetails.lastIndexOf("/");
375                         return consumerDetails.substring(startIndex, endIndex);
376                 }
377                 return null;
378         }
379
380         /**
381          * 
382          * @param op
383          * @return
384          * @throws IOException
385          * @throws JSONException
386          */
387         public int forEachMessage(operation op) throws IOException, JSONException {
388                 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
389
390                 int count = 0;
391                 boolean firstPing = true;
392                 // boolean isTransType=false;
393                 final long startMs = System.currentTimeMillis();
394                 final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll 
395
396                 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
397                         if (!firstPing) {
398                                 op.onWait();
399                         }
400                         firstPing = false;
401
402                 
403                                  Consumer.Message msgRecord = null;
404                                  while (count < effectiveLimit && (msgRecord =
405                                  fConsumer.nextMessage()) != null) {
406
407                                 String message = "";
408                                 String transactionid = "";
409                                 try {
410                    // String msgRecord = msg;
411                                         JSONObject jsonMessage = new JSONObject(msgRecord);
412                                         String[] keys = JSONObject.getNames(jsonMessage);
413                                         boolean wrapheader1 = false;
414                                         boolean wrapheader2 = false;
415                                         boolean found_attr3 = false;
416                                         String wrapElement1 = "message";
417                                         String wrapElement2 = "msgWrapMR";
418                                         String transIdElement = "transactionId";
419                                         if (null != keys) {
420                                                 for (String key : keys) {
421                                                         if (key.equals(wrapElement1)) {
422                                                                 wrapheader1 = true;
423                                                         } else if (key.equals(wrapElement2)) {
424                                                                 wrapheader2 = true;
425                                                         } else if (key.equals(transIdElement)) {
426                                                                 found_attr3 = true;
427                                                                 transactionid = jsonMessage.getString(key);
428                                                         }
429                                                 }
430                                         }
431
432                                         // returns contents of attribute 1 if both attributes
433                                         // present, otherwise
434                                         // the whole msg
435                                         if (wrapheader2 && found_attr3) {
436                                                 message = jsonMessage.getString(wrapElement2);
437                                         } else if (wrapheader1 && found_attr3) {
438                                                 message = jsonMessage.getString(wrapElement1);
439                                         } else {
440                                                 message = msgRecord.getMessage();
441                                         }
442                                         // jsonMessage = extractMessage(jsonMessage ,
443                                         // "message","msgWrapMR","transactionId");
444                                         istransType = true;
445                                 } catch (JSONException e) { // This check is required for the
446                                                                                         // message sent by MR AAF flow but
447                                                                                         // consumed by UEB ACL flow which
448                                                                                         // wont expect transaction id in
449                                                                                         // cambria client api
450                                         // Ignore
451                                         log.info("JSON Exception logged when the message is non JSON Format");
452                                 } catch (Exception exp) {
453                                         log.info("****Some Exception occured for writing messages in topic" + topic.getName()
454                                                         + "  Exception" + exp);
455                                 }
456                                 if (message == null || message.equals("")) {
457                                         istransType = false;
458                                         message = msgRecord.getMessage();
459                                 }
460
461                                 // If filters are enabled/set, message should be in JSON format
462                                 // for filters to work for
463                                 // otherwise filter will automatically ignore message in
464                                 // non-json format.
465                                 if (filterMatches(message)) {
466                                         op.onMessage(count, message, transactionid, msgRecord.getOffset());
467                                         count++;
468
469                                 }
470
471                         }
472                 }
473                 return count;
474         }
475
476         
477
478         /**
479          * 
480          * Checks whether filter is initialized
481          */
482         /*private boolean isFilterInitialized() {
483                 return (fHpAlarmFilter != null && fHppe != null);
484         }
485 */
486         /**
487          * 
488          * @param msg
489          * @return
490          */
491         private boolean filterMatches(String msg) {
492                 boolean result = true;
493                 /*if (isFilterInitialized()) {
494                         try {
495                                 final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
496                                 result = fHpAlarmFilter.matches(fHppe, e);
497                         } catch (JSONException x) {
498                                 // the msg may not be JSON
499                                 result = false;
500                                 log.error("Failed due to " + x.getMessage());
501                         } catch (Exception x) {
502                                 log.error("Error using filter: " + x.getMessage(), x);
503                         }
504                 }*/
505
506                 return result;
507         }
508
509         public DMaaPContext getDmaapContext() {
510                 return dmaapContext;
511         }
512
513         public void setDmaapContext(DMaaPContext dmaapContext) {
514                 this.dmaapContext = dmaapContext;
515         }
516
517         public Topic getTopic() {
518                 return topic;
519         }
520
521         public void setTopic(Topic topic) {
522                 this.topic = topic;
523         }
524
525         public void setTopicStyle(boolean aaftopic) {
526                 this.isAAFTopic = aaftopic;
527         }
528
529         public void setTransEnabled(boolean transEnable) {
530                 this.istransEnable = transEnable;
531         }
532
533         
534         private final Consumer fConsumer;
535         private final int fLimit;
536         private final int fTimeoutMs;
537         // private final rrNvReadable fSettings;
538         private final boolean fPretty;
539         private final boolean fWithMeta;
540         private int fSent;
541 //      private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
542         //private final HpProcessingEngine<HpJsonEvent> fHppe;
543         private DMaaPContext dmaapContext;
544         private String responseTransactionId;
545         private Topic topic;
546         private boolean isAAFTopic = false;
547         private boolean istransEnable = false;
548         private ArrayList<Consumer> fKafkaConsumerList;
549         private boolean istransType = true;
550         // private static final Logger log =
551         // Logger.getLogger(CambriaOutboundEventStream.class);
552
553         private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
554 }