7366ddebe0f44f49fdd62d94cfe6b69179ca6fe2
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / resources / CambriaOutboundEventStream.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.resources;
23
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.Date;
27
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30
31 import com.att.eelf.configuration.EELFLogger;
32 import com.att.eelf.configuration.EELFManager;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.json.JSONTokener;
36
37 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
38 import com.att.nsa.cambria.CambriaApiException;
39 import com.att.nsa.cambria.backends.Consumer;
40 import com.att.nsa.cambria.backends.Consumer.Message;
41 import com.att.nsa.cambria.beans.DMaaPContext;
42 import com.att.nsa.cambria.constants.CambriaConstants;
43 import com.att.nsa.cambria.metabroker.Topic;
44 import com.att.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter;
45 import com.att.nsa.cambria.utils.Utils;
46
47 import jline.internal.Log;
48
49
50 /**
51  * class used to write the consumed messages
52  * 
53  * @author author
54  *
55  */
56 public class CambriaOutboundEventStream implements StreamWriter {
57         private static final int kTopLimit = 1024 * 4;
58
59         /**
60          * 
61          * static innerclass it takes all the input parameter for kafka consumer
62          * like limit, timeout, meta, pretty
63          * 
64          * @author author
65          *
66          */
67         public static class Builder {
68
69                 // Required
70                 private final Consumer fConsumer;
71                 //private final rrNvReadable fSettings;   // used during write to tweak
72                                                                                                 // format, decide to explicitly
73                                                                                                 // close stream or not
74
75                 // Optional
76                 private int fLimit;
77                 private int fTimeoutMs;
78                 private String fTopicFilter;
79                 private boolean fPretty;
80                 private boolean fWithMeta;
81
82                 // private int fOffset;
83                 /**
84                  * constructor it initializes all the consumer parameters
85                  * 
86                  * @param c
87                  * @param settings
88                  */
89                 public Builder(Consumer c) {
90                         this.fConsumer = c;
91                         //this.fSettings = settings;
92
93                         fLimit = CambriaConstants.kNoTimeout;
94                         fTimeoutMs = CambriaConstants.kNoLimit;
95                         fTopicFilter = CambriaConstants.kNoFilter;
96                         fPretty = false;
97                         fWithMeta = false;
98                         // fOffset = CambriaEvents.kNextOffset;
99                 }
100
101                 /**
102                  * 
103                  * constructor initializes with limit
104                  * 
105                  * @param l
106                  *            only l no of messages will be consumed
107                  * @return
108                  */
109                 public Builder limit(int l) {
110                         this.fLimit = l;
111                         return this;
112                 }
113
114                 /**
115                  * constructor initializes with timeout
116                  * 
117                  * @param t
118                  *            if there is no message to consume, them DMaaP will wait
119                  *            for t time
120                  * @return
121                  */
122                 public Builder timeout(int t) {
123                         this.fTimeoutMs = t;
124                         return this;
125                 }
126
127                 /**
128                  * constructor initializes with filter
129                  * 
130                  * @param f
131                  *            filter
132                  * @return
133                  */
134                 public Builder filter(String f) {
135                         this.fTopicFilter = f;
136                         return this;
137                 }
138
139                 /**
140                  * constructor initializes with boolean value pretty
141                  * 
142                  * @param p
143                  *            messages print in new line
144                  * @return
145                  */
146                 public Builder pretty(boolean p) {
147                         fPretty = p;
148                         return this;
149                 }
150
151                 /**
152                  * constructor initializes with boolean value meta
153                  * 
154                  * @param withMeta,
155                  *            along with messages offset will print
156                  * @return
157                  */
158                 public Builder withMeta(boolean withMeta) {
159                         fWithMeta = withMeta;
160                         return this;
161                 }
162
163                 // public Builder atOffset ( int pos )
164                 // {
165                 // fOffset = pos;
166                 // return this;
167                 // }
168                 /**
169                  * method returs object of CambriaOutboundEventStream
170                  * 
171                  * @return
172                  * @throws CambriaApiException
173                  */
174                 public CambriaOutboundEventStream build() throws CambriaApiException {
175                         return new CambriaOutboundEventStream(this);
176                 }
177         }
178
179         @SuppressWarnings("unchecked")
180         /**
181          * 
182          * @param builder
183          * @throws CambriaApiException
184          * 
185          */
186         private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
187                 fConsumer = builder.fConsumer;
188                 fLimit = builder.fLimit;
189                 fTimeoutMs = builder.fTimeoutMs;
190                 //fSettings = builder.fSettings;
191                 fSent = 0;
192                 fPretty = builder.fPretty;
193                 fWithMeta = builder.fWithMeta;
194                 
195 //              if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
196 //                      fHpAlarmFilter = null;
197 //                      fHppe = null;
198 //              } else {
199 //                      try {
200 //                              final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
201 //                              HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
202 //                              fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
203 //                              final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
204 //                              fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
205 //                      } catch (HpReaderException e) {
206 //                              // JSON was okay, but the filter engine says it's bogus
207 //                              throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
208 //                                              "Couldn't create filter: " + e.getMessage());
209 //                      } catch (JSONException e) {
210 //                              // user sent a bogus JSON object
211 //                              throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
212 //                                              "Couldn't parse JSON: " + e.getMessage());
213 //                      }
214 //              }
215         }
216
217         /**
218          * 
219          * interface provides onWait and onMessage methods
220          *
221          */
222         public interface operation {
223                 /**
224                  * Call thread.sleep
225                  * @throws IOException
226                  */
227                 void onWait() throws IOException;
228 /**
229  * provides the output based in the consumer paramter
230  * @param count
231  * @param msg
232  * @throws IOException
233  */
234                 void onMessage(int count, Message msg) throws IOException;
235         }
236
237         /**
238          * 
239          * @return
240          */
241         public int getSentCount() {
242                 return fSent;
243         }
244
245         @Override
246         /**
247          * 
248          * @param os
249          * throws IOException
250          */
251         public void write(final OutputStream os) throws IOException {
252                 //final boolean transactionEnabled = topic.isTransactionEnabled();
253                 //final boolean transactionEnabled = isTransEnabled();
254                 final boolean transactionEnabled = istransEnable;
255                 os.write('[');
256
257                 fSent = forEachMessage(new operation() {
258                         @Override
259                         public void onMessage(int count, Message msg) throws IOException, JSONException {
260
261                                 String message = "";
262                                 JSONObject jsonMessage = null;
263                                 if (transactionEnabled) {
264                                         jsonMessage = new JSONObject(msg.getMessage());
265                                         message = jsonMessage.getString("message");
266                                 }
267
268                                 if (count > 0) {
269                                         os.write(',');
270                                 }
271
272                                 if (fWithMeta) {
273                                         final JSONObject entry = new JSONObject();
274                                         entry.put("offset", msg.getOffset());
275                                         entry.put("message", message);
276                                         os.write(entry.toString().getBytes());
277                                 } else {
278                                         //os.write(message.getBytes());
279                                          String jsonString = "";
280                                         if(transactionEnabled){
281                                                 jsonString= JSONObject.valueToString(message);
282                                         }else{
283                                                 jsonString = JSONObject.valueToString (msg.getMessage());
284                                                 }
285                                         os.write ( jsonString.getBytes () );
286                                 }
287
288                                 if (fPretty) {
289                                         os.write('\n');
290                                 }
291
292                                 
293                                 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
294                                 if (null==metricTopicname)
295                           metricTopicname="msgrtr.apinode.metrics.dmaap";
296                  
297                  if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
298                                 if (transactionEnabled) {
299                                         final String transactionId = jsonMessage.getString("transactionId");
300                                         responseTransactionId = transactionId;
301
302                                         StringBuilder consumerInfo = new StringBuilder();
303                                         if (null != dmaapContext && null != dmaapContext.getRequest()) {
304                                                 final HttpServletRequest request = dmaapContext.getRequest();
305                                                 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
306                                                 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
307                                                 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
308                                                 consumerInfo.append(
309                                                                 "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
310                                                 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
311                                         }
312
313                                         log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId
314                                                         + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]");
315                                 }
316                  }
317
318                         }
319
320                         @Override
321                         /**
322                          * 
323                          * It makes thread to wait
324                          * @throws IOException
325                          */
326                         public void onWait() throws IOException {
327                                 os.flush(); // likely totally unnecessary for a network socket
328                                 try {
329                                         // FIXME: would be good to wait/signal
330                                         Thread.sleep(100);
331                                 } catch (InterruptedException e) {
332                                         Log.error(e.toString());
333                                 Thread.currentThread().interrupt();     
334                                 }
335                         }
336                 });
337
338                 //if (null != dmaapContext && isTransactionEnabled()) {
339                         if (null != dmaapContext && istransEnable) {
340                         
341                         dmaapContext.getResponse().setHeader("transactionId",
342                                         Utils.getResponseTransactionId(responseTransactionId));
343                 }
344
345                 os.write(']');
346                 os.flush();
347
348                 boolean close_out_stream = true;
349                 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream");
350                 if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream);
351                 
352                 //if (fSettings.getBoolean("close.output.stream", true)) {
353                                 if (close_out_stream) {
354                         os.close();
355                 }
356         }
357
358         /**
359          * 
360          * @param requestURI
361          * @return
362          */
363         private String getConsumerGroupFromRequest(String requestURI) {
364                 if (null != requestURI && !requestURI.isEmpty()) {
365
366                         String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
367
368                         int startIndex = consumerDetails.indexOf("/") + 1;
369                         int endIndex = consumerDetails.lastIndexOf("/");
370                         return consumerDetails.substring(startIndex, endIndex);
371                 }
372                 return null;
373         }
374 /**
375  * 
376  * @param op
377  * @return
378  * @throws IOException
379  * @throws JSONException 
380  */
381         public int forEachMessage(operation op) throws IOException, JSONException {
382                 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
383
384                 int count = 0;
385                 boolean firstPing = true;
386
387                 final long startMs = System.currentTimeMillis();
388                 final long timeoutMs = fTimeoutMs + startMs;
389
390                 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
391                         if (!firstPing) {
392                                 op.onWait();
393                         }
394                         firstPing = false;
395
396                         Consumer.Message msg = null;
397                         while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) {
398
399                                 
400                                 String message = "";
401                         //      if (topic.isTransactionEnabled() || true) {
402                                 if (istransEnable) {
403                                         // As part of DMaaP changes we are wrapping the original
404                                         // message into a json object
405                                         // and then this json object is further wrapped into message
406                                         // object before publishing,
407                                         // so extracting the original message from the message
408                                         // object for matching with filter.
409                                         final JSONObject jsonMessage = new JSONObject(msg.getMessage());
410                                         message = jsonMessage.getString("message");
411                                 } else {
412                                         message = msg.getMessage();
413                                 }
414
415                                 // If filters are enabled/set, message should be in JSON format
416                                 // for filters to work for
417                                 // otherwise filter will automatically ignore message in
418                                 // non-json format.
419                                 if (filterMatches(message)) {
420                                         op.onMessage(count, msg);
421                                         count++;
422                                 }
423                         }
424                 }
425
426                 return count;
427         }
428
429         /**
430          * 
431          * Checks whether filter is initialized
432          */
433 //      private boolean isFilterInitialized() {
434 //              return (fHpAlarmFilter != null && fHppe != null);
435 //      }
436
437         /**
438          * 
439          * @param msg
440          * @return
441          */
442         private boolean filterMatches(String msg) {
443                 boolean result = true;
444 //              if (isFilterInitialized()) {
445 //                      try {
446 //                              final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
447 //                              result = fHpAlarmFilter.matches(fHppe, e);
448 //                      } catch (JSONException x) {
449 //                              // the msg may not be JSON
450 //                              result = false;
451 //                              log.error("Failed due to " + x.getMessage());
452 //                      } catch (Exception x) {
453 //                              log.error("Error using filter: " + x.getMessage(), x);
454 //                      }
455 //              }
456
457                 return result;
458         }
459
460         public DMaaPContext getDmaapContext() {
461                 return dmaapContext;
462         }
463
464         public void setDmaapContext(DMaaPContext dmaapContext) {
465                 this.dmaapContext = dmaapContext;
466         }
467
468         public Topic getTopic() {
469                 return topic;
470         }
471
472         public void setTopic(Topic topic) {
473                 this.topic = topic;
474         }
475         
476         public void setTopicStyle(boolean aaftopic) {
477                 this.isAAFTopic = aaftopic;
478         }
479         
480         public void setTransEnabled ( boolean transEnable) {
481                 this.istransEnable = transEnable;
482         }
483
484         /*private boolean isTransactionEnabled() {
485                 //return topic.isTransactionEnabled();
486                 return true; // let metrics creates for all the topics
487         }*/
488
489         private boolean isTransEnabled() {
490                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
491                 boolean istransidreqd=false;
492                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){
493                         istransidreqd = true; 
494                 }
495                 
496                 return istransidreqd;
497
498         }
499         
500         private final Consumer fConsumer;
501         private final int fLimit;
502         private final int fTimeoutMs;
503         //private final rrNvReadable fSettings;
504         private final boolean fPretty;
505         private final boolean fWithMeta;
506         private int fSent;
507 //      private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
508 //      private final HpProcessingEngine<HpJsonEvent> fHppe;
509         private DMaaPContext dmaapContext;
510         private String responseTransactionId;
511         private Topic topic;
512         private boolean isAAFTopic = false;
513         private boolean istransEnable = false;
514         
515
516         //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class);
517         
518         private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
519 }