9fbc7f7f9d27b59a72169419e6945a089c5970ab
[dmaap/messagerouter/msgrtr.git] /
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources;
23
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.Date;
27
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30
31 import com.att.eelf.configuration.EELFLogger;
32 import com.att.eelf.configuration.EELFManager;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.json.JSONTokener;
36 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
37 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer;
38 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer.Message;
39 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.DMaaPContext;
40 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.constants.CambriaConstants;
41 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Topic;
42 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.Utils;
43 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.DMaaPResponseBuilder.StreamWriter;
44
45 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
46
47
48 /**
49  * class used to write the consumed messages
50  * 
51  * @author author
52  *
53  */
54 public class CambriaOutboundEventStream implements StreamWriter {
55         private static final int kTopLimit = 1024 * 4;
56
57         /**
58          * 
59          * static innerclass it takes all the input parameter for kafka consumer
60          * like limit, timeout, meta, pretty
61          * 
62          * @author author
63          *
64          */
65         public static class Builder {
66
67                 // Required
68                 private final Consumer fConsumer;
69                 //private final rrNvReadable fSettings;   // used during write to tweak
70                                                                                                 // format, decide to explicitly
71                                                                                                 // close stream or not
72
73                 // Optional
74                 private int fLimit;
75                 private int fTimeoutMs;
76                 private String fTopicFilter;
77                 private boolean fPretty;
78                 private boolean fWithMeta;
79
80                 // private int fOffset;
81                 /**
82                  * constructor it initializes all the consumer parameters
83                  * 
84                  * @param c
85                  * @param settings
86                  */
87                 public Builder(Consumer c) {
88                         this.fConsumer = c;
89                         //this.fSettings = settings;
90
91                         fLimit = CambriaConstants.kNoTimeout;
92                         fTimeoutMs = CambriaConstants.kNoLimit;
93                         fTopicFilter = CambriaConstants.kNoFilter;
94                         fPretty = false;
95                         fWithMeta = false;
96                         // fOffset = CambriaEvents.kNextOffset;
97                 }
98
99                 /**
100                  * 
101                  * constructor initializes with limit
102                  * 
103                  * @param l
104                  *            only l no of messages will be consumed
105                  * @return
106                  */
107                 public Builder limit(int l) {
108                         this.fLimit = l;
109                         return this;
110                 }
111
112                 /**
113                  * constructor initializes with timeout
114                  * 
115                  * @param t
116                  *            if there is no message to consume, them DMaaP will wait
117                  *            for t time
118                  * @return
119                  */
120                 public Builder timeout(int t) {
121                         this.fTimeoutMs = t;
122                         return this;
123                 }
124
125                 /**
126                  * constructor initializes with filter
127                  * 
128                  * @param f
129                  *            filter
130                  * @return
131                  */
132                 public Builder filter(String f) {
133                         this.fTopicFilter = f;
134                         return this;
135                 }
136
137                 /**
138                  * constructor initializes with boolean value pretty
139                  * 
140                  * @param p
141                  *            messages print in new line
142                  * @return
143                  */
144                 public Builder pretty(boolean p) {
145                         fPretty = p;
146                         return this;
147                 }
148
149                 /**
150                  * constructor initializes with boolean value meta
151                  * 
152                  * @param withMeta,
153                  *            along with messages offset will print
154                  * @return
155                  */
156                 public Builder withMeta(boolean withMeta) {
157                         fWithMeta = withMeta;
158                         return this;
159                 }
160
161                 // public Builder atOffset ( int pos )
162                 // {
163                 // fOffset = pos;
164                 // return this;
165                 // }
166                 /**
167                  * method returs object of CambriaOutboundEventStream
168                  * 
169                  * @return
170                  * @throws CambriaApiException
171                  */
172                 public CambriaOutboundEventStream build() throws CambriaApiException {
173                         return new CambriaOutboundEventStream(this);
174                 }
175         }
176
177         @SuppressWarnings("unchecked")
178         /**
179          * 
180          * @param builder
181          * @throws CambriaApiException
182          * 
183          */
184         private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
185                 fConsumer = builder.fConsumer;
186                 fLimit = builder.fLimit;
187                 fTimeoutMs = builder.fTimeoutMs;
188                 //fSettings = builder.fSettings;
189                 fSent = 0;
190                 fPretty = builder.fPretty;
191                 fWithMeta = builder.fWithMeta;
192                 
193 //              if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
194 //                      fHpAlarmFilter = null;
195 //                      fHppe = null;
196 //              } else {
197 //                      try {
198 //                              final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
199 //                              HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
200 //                              fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
201 //                              final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
202 //                              fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
203 //                      } catch (HpReaderException e) {
204 //                              // JSON was okay, but the filter engine says it's bogus
205 //                              throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
206 //                                              "Couldn't create filter: " + e.getMessage());
207 //                      } catch (JSONException e) {
208 //                              // user sent a bogus JSON object
209 //                              throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
210 //                                              "Couldn't parse JSON: " + e.getMessage());
211 //                      }
212 //              }
213         }
214
215         /**
216          * 
217          * interface provides onWait and onMessage methods
218          *
219          */
220         public interface operation {
221                 /**
222                  * Call thread.sleep
223                  * @throws IOException
224                  */
225                 void onWait() throws IOException;
226 /**
227  * provides the output based in the consumer paramter
228  * @param count
229  * @param msg
230  * @throws IOException
231  */
232                 void onMessage(int count, Message msg) throws IOException;
233         }
234
235         /**
236          * 
237          * @return
238          */
239         public int getSentCount() {
240                 return fSent;
241         }
242
243         @Override
244         /**
245          * 
246          * @param os
247          * throws IOException
248          */
249         public void write(final OutputStream os) throws IOException {
250                 //final boolean transactionEnabled = topic.isTransactionEnabled();
251                 //final boolean transactionEnabled = isTransEnabled();
252                 final boolean transactionEnabled = istransEnable;
253                 os.write('[');
254
255                 fSent = forEachMessage(new operation() {
256                         @Override
257                         public void onMessage(int count, Message msg) throws IOException, JSONException {
258
259                                 String message = "";
260                                 JSONObject jsonMessage = null;
261                                 if (transactionEnabled) {
262                                         jsonMessage = new JSONObject(msg.getMessage());
263                                         message = jsonMessage.getString("message");
264                                 }
265
266                                 if (count > 0) {
267                                         os.write(',');
268                                 }
269
270                                 if (fWithMeta) {
271                                         final JSONObject entry = new JSONObject();
272                                         entry.put("offset", msg.getOffset());
273                                         entry.put("message", message);
274                                         os.write(entry.toString().getBytes());
275                                 } else {
276                                         //os.write(message.getBytes());
277                                          String jsonString = "";
278                                         if(transactionEnabled){
279                                                 jsonString= JSONObject.valueToString(message);
280                                         }else{
281                                                 jsonString = JSONObject.valueToString (msg.getMessage());
282                                                 }
283                                         os.write ( jsonString.getBytes () );
284                                 }
285
286                                 if (fPretty) {
287                                         os.write('\n');
288                                 }
289
290                                 
291                                 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
292                                 if (null==metricTopicname)
293                           metricTopicname="msgrtr.apinode.metrics.dmaap";
294                  
295                  if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
296                                 if (transactionEnabled) {
297                                         final String transactionId = jsonMessage.getString("transactionId");
298                                         responseTransactionId = transactionId;
299
300                                         StringBuilder consumerInfo = new StringBuilder();
301                                         if (null != dmaapContext && null != dmaapContext.getRequest()) {
302                                                 final HttpServletRequest request = dmaapContext.getRequest();
303                                                 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
304                                                 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
305                                                 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
306                                                 consumerInfo.append(
307                                                                 "consumerGroup= \"" + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
308                                                 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
309                                         }
310
311                                         log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transactionId
312                                                         + "\",messageLength= \"" + message.length() + "\",topic= \"" + topic.getName() + "\"]");
313                                 }
314                  }
315
316                         }
317
318                         @Override
319                         /**
320                          * 
321                          * It makes thread to wait
322                          * @throws IOException
323                          */
324                         public void onWait() throws IOException {
325                                 os.flush(); // likely totally unnecessary for a network socket
326                                 try {
327                                         // FIXME: would be good to wait/signal
328                                         Thread.sleep(100);
329                                 } catch (InterruptedException e) {
330                                         // ignore
331                                 }
332                         }
333                 });
334
335                 //if (null != dmaapContext && isTransactionEnabled()) {
336                         if (null != dmaapContext && istransEnable) {
337                         
338                         dmaapContext.getResponse().setHeader("transactionId",
339                                         Utils.getResponseTransactionId(responseTransactionId));
340                 }
341
342                 os.write(']');
343                 os.flush();
344
345                 boolean close_out_stream = true;
346                 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"close.output.stream");
347                 if(null!=strclose_out_stream)close_out_stream=Boolean.parseBoolean(strclose_out_stream);
348                 
349                 //if (fSettings.getBoolean("close.output.stream", true)) {
350                                 if (close_out_stream) {
351                         os.close();
352                 }
353         }
354
355         /**
356          * 
357          * @param requestURI
358          * @return
359          */
360         private String getConsumerGroupFromRequest(String requestURI) {
361                 if (null != requestURI && !requestURI.isEmpty()) {
362
363                         String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
364
365                         int startIndex = consumerDetails.indexOf("/") + 1;
366                         int endIndex = consumerDetails.lastIndexOf("/");
367                         return consumerDetails.substring(startIndex, endIndex);
368                 }
369                 return null;
370         }
371 /**
372  * 
373  * @param op
374  * @return
375  * @throws IOException
376  * @throws JSONException 
377  */
378         public int forEachMessage(operation op) throws IOException, JSONException {
379                 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
380
381                 int count = 0;
382                 boolean firstPing = true;
383
384                 final long startMs = System.currentTimeMillis();
385                 final long timeoutMs = fTimeoutMs + startMs;
386
387                 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
388                         if (!firstPing) {
389                                 op.onWait();
390                         }
391                         firstPing = false;
392
393                         Consumer.Message msg = null;
394                         while (count < effectiveLimit && (msg = fConsumer.nextMessage()) != null) {
395
396                                 
397                                 String message = "";
398                         //      if (topic.isTransactionEnabled() || true) {
399                                 if (istransEnable) {
400                                         // As part of DMaaP changes we are wrapping the original
401                                         // message into a json object
402                                         // and then this json object is further wrapped into message
403                                         // object before publishing,
404                                         // so extracting the original message from the message
405                                         // object for matching with filter.
406                                         final JSONObject jsonMessage = new JSONObject(msg.getMessage());
407                                         message = jsonMessage.getString("message");
408                                 } else {
409                                         message = msg.getMessage();
410                                 }
411
412                                 // If filters are enabled/set, message should be in JSON format
413                                 // for filters to work for
414                                 // otherwise filter will automatically ignore message in
415                                 // non-json format.
416                                 if (filterMatches(message)) {
417                                         op.onMessage(count, msg);
418                                         count++;
419                                 }
420                         }
421                 }
422
423                 return count;
424         }
425
426         /**
427          * 
428          * Checks whether filter is initialized
429          */
430 //      private boolean isFilterInitialized() {
431 //              return (fHpAlarmFilter != null && fHppe != null);
432 //      }
433
434         /**
435          * 
436          * @param msg
437          * @return
438          */
439         private boolean filterMatches(String msg) {
440                 boolean result = true;
441 //              if (isFilterInitialized()) {
442 //                      try {
443 //                              final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
444 //                              result = fHpAlarmFilter.matches(fHppe, e);
445 //                      } catch (JSONException x) {
446 //                              // the msg may not be JSON
447 //                              result = false;
448 //                              log.error("Failed due to " + x.getMessage());
449 //                      } catch (Exception x) {
450 //                              log.error("Error using filter: " + x.getMessage(), x);
451 //                      }
452 //              }
453
454                 return result;
455         }
456
457         public DMaaPContext getDmaapContext() {
458                 return dmaapContext;
459         }
460
461         public void setDmaapContext(DMaaPContext dmaapContext) {
462                 this.dmaapContext = dmaapContext;
463         }
464
465         public Topic getTopic() {
466                 return topic;
467         }
468
469         public void setTopic(Topic topic) {
470                 this.topic = topic;
471         }
472         
473         public void setTopicStyle(boolean aaftopic) {
474                 this.isAAFTopic = aaftopic;
475         }
476         
477         public void setTransEnabled ( boolean transEnable) {
478                 this.istransEnable = transEnable;
479         }
480
481         /*private boolean isTransactionEnabled() {
482                 //return topic.isTransactionEnabled();
483                 return true; // let metrics creates for all the topics
484         }*/
485
486         private boolean isTransEnabled() {
487                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
488                 boolean istransidreqd=false;
489                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) || isAAFTopic){
490                         istransidreqd = true; 
491                 }
492                 
493                 return istransidreqd;
494
495         }
496         
497         private final Consumer fConsumer;
498         private final int fLimit;
499         private final int fTimeoutMs;
500         //private final rrNvReadable fSettings;
501         private final boolean fPretty;
502         private final boolean fWithMeta;
503         private int fSent;
504 //      private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
505 //      private final HpProcessingEngine<HpJsonEvent> fHppe;
506         private DMaaPContext dmaapContext;
507         private String responseTransactionId;
508         private Topic topic;
509         private boolean isAAFTopic = false;
510         private boolean istransEnable = false;
511         
512
513         //private static final Logger log = Logger.getLogger(CambriaOutboundEventStream.class);
514         
515         private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
516 }