fixing code smells
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / resources / CambriaOutboundEventStream.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources;
23
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.util.ArrayList;
27 import java.util.Date;
28
29 import javax.servlet.http.HttpServletRequest;
30 import org.json.JSONException;
31 import org.json.JSONObject;
32
33 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
34 import org.onap.dmaap.dmf.mr.CambriaApiException;
35 import org.onap.dmaap.dmf.mr.backends.Consumer;
36 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
37 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
38 import org.onap.dmaap.dmf.mr.metabroker.Topic;
39 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
40 import org.onap.dmaap.dmf.mr.utils.Utils;
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43
44
45
46
47
48
49
50
51
52
53 /**
54  * class used to write the consumed messages
55  * 
56  * @author anowarul.islam
57  *
58  */
59 public class CambriaOutboundEventStream implements StreamWriter {
60         private static final int kTopLimit = 1024 * 4;
61
62         /**
63          * 
64          * static innerclass it takes all the input parameter for kafka consumer
65          * like limit, timeout, meta, pretty
66          * 
67          * @author anowarul.islam
68          *
69          */
70         public static class Builder {
71
72                 // Required
73                 private final Consumer fConsumer;
74                 // private final rrNvReadable fSettings; // used during write to tweak
75                 // format, decide to explicitly
76                 // close stream or not
77
78                 // Optional
79                 private int fLimit;
80                 private int fTimeoutMs;
81                 private String fTopicFilter;
82                 private boolean fPretty;
83                 private boolean fWithMeta;
84                 ArrayList<Consumer> fKafkaConsumerList;
85
86                 
87                 /**
88                  * constructor it initializes all the consumer parameters
89                  * 
90                  * @param c
91                  * @param settings
92                  */
93                 public Builder(Consumer c) {
94                         this.fConsumer = c;
95                         
96
97                         fLimit = CambriaConstants.kNoTimeout;
98                         fTimeoutMs = CambriaConstants.kNoLimit;
99                         fTopicFilter = CambriaConstants.kNoFilter;
100                         fPretty = false;
101                         fWithMeta = false;
102                         
103         
104                 }
105
106                 /**
107                  * 
108                  * constructor initializes with limit
109                  * 
110                  * @param l
111                  *            only l no of messages will be consumed
112                  * @return
113                  */
114                 public Builder limit(int l) {
115                         this.fLimit = l;
116                         return this;
117                 }
118
119                 /**
120                  * constructor initializes with timeout
121                  * 
122                  * @param t
123                  *            if there is no message to consume, them DMaaP will wait
124                  *            for t time
125                  * @return
126                  */
127                 public Builder timeout(int t) {
128                         this.fTimeoutMs = t;
129                         return this;
130                 }
131
132                 /**
133                  * constructor initializes with filter
134                  * 
135                  * @param f
136                  *            filter
137                  * @return
138                  */
139                 public Builder filter(String f) {
140                         this.fTopicFilter = f;
141                         return this;
142                 }
143
144                 /**
145                  * constructor initializes with boolean value pretty
146                  * 
147                  * @param p
148                  *            messages print in new line
149                  * @return
150                  */
151                 public Builder pretty(boolean p) {
152                         fPretty = p;
153                         return this;
154                 }
155
156                 /**
157                  * constructor initializes with boolean value meta
158                  * 
159                  * @param withMeta,
160                  *            along with messages offset will print
161                  * @return
162                  */
163                 public Builder withMeta(boolean withMeta) {
164                         fWithMeta = withMeta;
165                         return this;
166                 }
167
168                 // public Builder atOffset ( int pos )
169                 
170         
171                 
172                 // }
173                 /**
174                  * method returs object of CambriaOutboundEventStream
175                  * 
176                  * @return
177                  * @throws CambriaApiException
178                  */
179                 public CambriaOutboundEventStream build() throws CambriaApiException {
180                         return new CambriaOutboundEventStream(this);
181                 }
182         }
183
184         @SuppressWarnings("unchecked")
185         /**
186          * 
187          * @param builder
188          * @throws CambriaApiException
189          * 
190          */
191         private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
192                 fConsumer = builder.fConsumer;
193                 fLimit = builder.fLimit;
194                 fTimeoutMs = builder.fTimeoutMs;
195                 
196                 fSent = 0;
197                 fPretty = builder.fPretty;
198                 fWithMeta = builder.fWithMeta;
199                 fKafkaConsumerList = builder.fKafkaConsumerList;
200         
201                         
202                         
203                 
204                         
205                                 
206                                 
207                                 
208                                 
209                                 
210                         
211                                 
212                         
213                                                 
214                         
215                                 
216                                 
217                                         
218                 
219         
220         }
221
222         /**
223          * 
224          * interface provides onWait and onMessage methods
225          *
226          */
227         public interface operation {
228                 /**
229                  * Call thread.sleep
230                  * 
231                  * @throws IOException
232                  */
233                 void onWait() throws IOException;
234
235                 /**
236                  * provides the output based in the consumer paramter
237                  * 
238                  * @param count
239                  * @param msg
240                  * @throws IOException
241                  */
242                 
243                 void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
244         }
245
246         /**
247          * 
248          * @return
249          */
250         public int getSentCount() {
251                 return fSent;
252         }
253
254         @Override
255         /**
256          * 
257          * @param os
258          *            throws IOException
259          */
260         public void write(final OutputStream os) throws IOException {
261                 
262         
263                 
264                 // synchronized(this){
265                 os.write('[');
266                 fSent = forEachMessage(new operation() {
267                         @Override
268                         public void onMessage(int count, String msg, String transId, long offSet)
269                                         throws IOException, JSONException {
270
271                                 if (count > 0) {
272                                         os.write(',');
273                                 }
274                                 if (fWithMeta) {
275                                         final JSONObject entry = new JSONObject();
276                                         entry.put("offset", offSet);
277                                         entry.put("message", msg);
278                                         os.write(entry.toString().getBytes());
279                                 } else {
280                                         
281                                                 String jsonString = JSONObject.valueToString(msg);
282                                         os.write(jsonString.getBytes());
283                                 }
284
285                                 if (fPretty) {
286                                         os.write('\n');
287                                 }
288
289                                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap
290                                                 .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
291                                 if (null == metricTopicname)
292                                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
293                                 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
294                                         try {
295                                                 if (istransEnable && istransType) {
296                                                         // final String transactionId =
297                                                         
298                                                         
299                                                         StringBuilder consumerInfo = new StringBuilder();
300                                                         if (null != dmaapContext && null != dmaapContext.getRequest()) {
301                                                                 final HttpServletRequest request = dmaapContext.getRequest();
302                                                                 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
303                                                                 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
304                                                                 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
305                                                                 consumerInfo.append("consumerGroup= \""
306                                                                                 + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
307                                                                 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
308                                                         }
309                                                         log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
310                                                                         + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
311                                                 }
312                                         } catch (Exception e) {
313                                         }
314                                 }
315
316                         }
317
318                         @Override
319                         /**
320                          * 
321                          * It makes thread to wait
322                          * 
323                          * @throws IOException
324                          */
325                         public void onWait() throws IOException {
326                                 os.flush(); // likely totally unnecessary for a network socket
327                                 try {
328                                         // FIXME: would be good to wait/signal
329                                         Thread.sleep(100);
330                                 } catch (InterruptedException e) {
331                                     Thread.currentThread().interrupt();
332                                 }
333                         }
334                 });
335
336                 
337                 if (null != dmaapContext && istransEnable && istransType) {
338
339                         dmaapContext.getResponse().setHeader("transactionId",
340                                         Utils.getResponseTransactionId(responseTransactionId));
341                 }
342
343                 os.write(']');
344                 os.flush();
345
346                 boolean close_out_stream = true;
347                 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
348                 if (null != strclose_out_stream)
349                         close_out_stream = Boolean.parseBoolean(strclose_out_stream);
350
351                 
352                 if (close_out_stream) {
353                         os.close();
354                         
355                 }
356         }
357
358         /**
359          * 
360          * @param requestURI
361          * @return
362          */
363         private String getConsumerGroupFromRequest(String requestURI) {
364                 if (null != requestURI && !requestURI.isEmpty()) {
365
366                         String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
367
368                         int startIndex = consumerDetails.indexOf("/") + 1;
369                         int endIndex = consumerDetails.lastIndexOf("/");
370                         return consumerDetails.substring(startIndex, endIndex);
371                 }
372                 return null;
373         }
374
375         /**
376          * 
377          * @param op
378          * @return
379          * @throws IOException
380          * @throws JSONException
381          */
382         public int forEachMessage(operation op) throws IOException, JSONException {
383                 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
384
385                 int count = 0;
386                 boolean firstPing = true;
387                 // boolean isTransType=false;
388                 final long startMs = System.currentTimeMillis();
389                 final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll 
390
391                 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
392                         if (!firstPing) {
393                                 op.onWait();
394                         }
395                         firstPing = false;
396
397                 
398                                  Consumer.Message msgRecord = null;
399                                  while (count < effectiveLimit && (msgRecord =
400                                  fConsumer.nextMessage()) != null) {
401
402                                 String message = "";
403                                 String transactionid = "";
404                                 try {
405                    // String msgRecord = msg;
406                                         JSONObject jsonMessage = new JSONObject(msgRecord);
407                                         String[] keys = JSONObject.getNames(jsonMessage);
408                                         boolean wrapheader1 = false;
409                                         boolean wrapheader2 = false;
410                                         boolean found_attr3 = false;
411                                         String wrapElement1 = "message";
412                                         String wrapElement2 = "msgWrapMR";
413                                         String transIdElement = "transactionId";
414                                         if (null != keys) {
415                                                 for (String key : keys) {
416                                                         if (key.equals(wrapElement1)) {
417                                                                 wrapheader1 = true;
418                                                         } else if (key.equals(wrapElement2)) {
419                                                                 wrapheader2 = true;
420                                                         } else if (key.equals(transIdElement)) {
421                                                                 found_attr3 = true;
422                                                                 transactionid = jsonMessage.getString(key);
423                                                         }
424                                                 }
425                                         }
426
427                                         // returns contents of attribute 1 if both attributes
428                                         // present, otherwise
429                                         // the whole msg
430                                         if (wrapheader2 && found_attr3) {
431                                                 message = jsonMessage.getString(wrapElement2);
432                                         } else if (wrapheader1 && found_attr3) {
433                                                 message = jsonMessage.getString(wrapElement1);
434                                         } else {
435                                                 message = msgRecord.getMessage();
436                                         }
437                                         // jsonMessage = extractMessage(jsonMessage ,
438                                         // "message","msgWrapMR","transactionId");
439                                         istransType = true;
440                                 } catch (JSONException e) { // This check is required for the
441                                                                                         // message sent by MR AAF flow but
442                                                                                         // consumed by UEB ACL flow which
443                                                                                         // wont expect transaction id in
444                                                                                         // cambria client api
445                                         // Ignore
446                                         log.info("JSON Exception logged when the message is non JSON Format");
447                                 } catch (Exception exp) {
448                                         log.info("****Some Exception occured for writing messages in topic" + topic.getName()
449                                                         + "  Exception" + exp);
450                                 }
451                                 if (message == null || message.equals("")) {
452                                         istransType = false;
453                                         message = msgRecord.getMessage();
454                                 }
455
456                                 // If filters are enabled/set, message should be in JSON format
457                                 // for filters to work for
458                                 // otherwise filter will automatically ignore message in
459                                 // non-json format.
460                                 if (filterMatches(message)) {
461                                         op.onMessage(count, message, transactionid, msgRecord.getOffset());
462                                         count++;
463
464                                 }
465
466                         }
467                 }
468                 return count;
469         }
470
471         
472
473         /**
474          * 
475          * Checks whether filter is initialized
476          */
477         
478                 
479         
480
481         /**
482          * 
483          * @param msg
484          * @return
485          */
486         private boolean filterMatches(String msg) {
487                 boolean result = true;
488                 
489                 
490                                 
491                                 
492                         
493                         
494                         
495                                 
496                         
497                                 
498                 
499         
500
501                 return result;
502         }
503
504         public DMaaPContext getDmaapContext() {
505                 return dmaapContext;
506         }
507
508         public void setDmaapContext(DMaaPContext dmaapContext) {
509                 this.dmaapContext = dmaapContext;
510         }
511
512         public Topic getTopic() {
513                 return topic;
514         }
515
516         public void setTopic(Topic topic) {
517                 this.topic = topic;
518         }
519
520         public void setTopicStyle(boolean aaftopic) {
521                 this.isAAFTopic = aaftopic;
522         }
523
524         public void setTransEnabled(boolean transEnable) {
525                 this.istransEnable = transEnable;
526         }
527
528         
529         private final Consumer fConsumer;
530         private final int fLimit;
531         private final int fTimeoutMs;
532         
533         private final boolean fPretty;
534         private final boolean fWithMeta;
535         private int fSent;
536
537         
538         private DMaaPContext dmaapContext;
539         private String responseTransactionId;
540         private Topic topic;
541         private boolean isAAFTopic = false;
542         private boolean istransEnable = false;
543         private ArrayList<Consumer> fKafkaConsumerList;
544         private boolean istransType = true;
545         // private static final Logger log =
546
547
548         private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
549
550         public int getfLimit() {
551                 return fLimit;
552         }
553
554         public int getfTimeoutMs() {
555                 return fTimeoutMs;
556         }
557
558         public boolean isfPretty() {
559                 return fPretty;
560         }
561
562         public boolean isfWithMeta() {
563                 return fWithMeta;
564         }
565
566         public boolean isAAFTopic() {
567                 return isAAFTopic;
568         }
569
570         public boolean isIstransEnable() {
571                 return istransEnable;
572         }
573
574         public boolean isIstransType() {
575                 return istransType;
576         }
577 }