DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / resources / CambriaOutboundEventStream.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import org.json.JSONException;
28 import org.json.JSONObject;
29 import org.onap.dmaap.dmf.mr.CambriaApiException;
30 import org.onap.dmaap.dmf.mr.backends.Consumer;
31 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
32 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
33 import org.onap.dmaap.dmf.mr.metabroker.Topic;
34 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
35 import org.onap.dmaap.dmf.mr.utils.Utils;
36
37 import javax.servlet.http.HttpServletRequest;
38 import java.io.IOException;
39 import java.io.OutputStream;
40 import java.util.ArrayList;
41 import java.util.Date;
42
43
44 /**
45  * class used to write the consumed messages
46  * 
47  * @author anowarul.islam
48  *
49  */
50 public class CambriaOutboundEventStream implements StreamWriter {
51         private static final int kTopLimit = 1024 * 4;
52
53         /**
54          * 
55          * static innerclass it takes all the input parameter for kafka consumer
56          * like limit, timeout, meta, pretty
57          * 
58          * @author anowarul.islam
59          *
60          */
61         public static class Builder {
62
63                 // Required
64                 private final Consumer fConsumer;
65                 // private final rrNvReadable fSettings; // used during write to tweak
66                 // format, decide to explicitly
67                 // close stream or not
68
69                 // Optional
70                 private int fLimit;
71                 private int fTimeoutMs;
72                 private String fTopicFilter;
73                 private boolean fPretty;
74                 private boolean fWithMeta;
75                 ArrayList<Consumer> fKafkaConsumerList;
76
77                 
78                 /**
79                  * constructor it initializes all the consumer parameters
80                  * 
81                  * @param c
82                  * @param settings
83                  */
84                 public Builder(Consumer c) {
85                         this.fConsumer = c;
86                         
87
88                         fLimit = CambriaConstants.kNoTimeout;
89                         fTimeoutMs = CambriaConstants.kNoLimit;
90                         fTopicFilter = CambriaConstants.kNoFilter;
91                         fPretty = false;
92                         fWithMeta = false;
93                         
94         
95                 }
96
97                 /**
98                  * 
99                  * constructor initializes with limit
100                  * 
101                  * @param l
102                  *            only l no of messages will be consumed
103                  * @return
104                  */
105                 public Builder limit(int l) {
106                         this.fLimit = l;
107                         return this;
108                 }
109
110                 /**
111                  * constructor initializes with timeout
112                  * 
113                  * @param t
114                  *            if there is no message to consume, them DMaaP will wait
115                  *            for t time
116                  * @return
117                  */
118                 public Builder timeout(int t) {
119                         this.fTimeoutMs = t;
120                         return this;
121                 }
122
123                 /**
124                  * constructor initializes with filter
125                  * 
126                  * @param f
127                  *            filter
128                  * @return
129                  */
130                 public Builder filter(String f) {
131                         this.fTopicFilter = f;
132                         return this;
133                 }
134
135                 /**
136                  * constructor initializes with boolean value pretty
137                  * 
138                  * @param p
139                  *            messages print in new line
140                  * @return
141                  */
142                 public Builder pretty(boolean p) {
143                         fPretty = p;
144                         return this;
145                 }
146
147                 /**
148                  * constructor initializes with boolean value meta
149                  * 
150                  * @param withMeta,
151                  *            along with messages offset will print
152                  * @return
153                  */
154                 public Builder withMeta(boolean withMeta) {
155                         fWithMeta = withMeta;
156                         return this;
157                 }
158
159                 // public Builder atOffset ( int pos )
160                 
161         
162                 
163                 // }
164                 /**
165                  * method returs object of CambriaOutboundEventStream
166                  * 
167                  * @return
168                  * @throws CambriaApiException
169                  */
170                 public CambriaOutboundEventStream build() throws CambriaApiException {
171                         return new CambriaOutboundEventStream(this);
172                 }
173         }
174
175         @SuppressWarnings("unchecked")
176         /**
177          * 
178          * @param builder
179          * @throws CambriaApiException
180          * 
181          */
182         private CambriaOutboundEventStream(Builder builder) throws CambriaApiException {
183                 fConsumer = builder.fConsumer;
184                 fLimit = builder.fLimit;
185                 fTimeoutMs = builder.fTimeoutMs;
186                 
187                 fSent = 0;
188                 fPretty = builder.fPretty;
189                 fWithMeta = builder.fWithMeta;
190                 fKafkaConsumerList = builder.fKafkaConsumerList;
191         
192                         
193                         
194                 
195                         
196                                 
197                                 
198                                 
199                                 
200                                 
201                         
202                                 
203                         
204                                                 
205                         
206                                 
207                                 
208                                         
209                 
210         
211         }
212
213         /**
214          * 
215          * interface provides onWait and onMessage methods
216          *
217          */
218         public interface operation {
219                 /**
220                  * Call thread.sleep
221                  * 
222                  * @throws IOException
223                  */
224                 void onWait() throws IOException;
225
226                 /**
227                  * provides the output based in the consumer paramter
228                  * 
229                  * @param count
230                  * @param msg
231                  * @throws IOException
232                  */
233                 
234                 void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
235         }
236
237         /**
238          * 
239          * @return
240          */
241         public int getSentCount() {
242                 return fSent;
243         }
244
245         @Override
246         /**
247          * 
248          * @param os
249          *            throws IOException
250          */
251         public void write(final OutputStream os) throws IOException {
252                 
253         
254                 
255                 // synchronized(this){
256                 os.write('[');
257                 fSent = forEachMessage(new operation() {
258                         @Override
259                         public void onMessage(int count, String msg, String transId, long offSet)
260                                         throws IOException, JSONException {
261
262                                 if (count > 0) {
263                                         os.write(',');
264                                 }
265                                 if (fWithMeta) {
266                                         final JSONObject entry = new JSONObject();
267                                         entry.put("offset", offSet);
268                                         entry.put("message", msg);
269                                         os.write(entry.toString().getBytes());
270                                 } else {
271                                         
272                                                 String jsonString = JSONObject.valueToString(msg);
273                                         os.write(jsonString.getBytes());
274                                 }
275
276                                 if (fPretty) {
277                                         os.write('\n');
278                                 }
279
280                                 String metricTopicname = AJSCPropertiesMap
281                                                 .getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
282                                 if (null == metricTopicname)
283                                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
284                                 if (!metricTopicname.equalsIgnoreCase(topic.getName())) {
285                                         try {
286                                                 if (istransEnable && istransType) {
287                                                         // final String transactionId =
288                                                         
289                                                         
290                                                         StringBuilder consumerInfo = new StringBuilder();
291                                                         if (null != dmaapContext && null != dmaapContext.getRequest()) {
292                                                                 final HttpServletRequest request = dmaapContext.getRequest();
293                                                                 consumerInfo.append("consumerIp= \"" + request.getRemoteHost() + "\",");
294                                                                 consumerInfo.append("consServerIp= \"" + request.getLocalAddr() + "\",");
295                                                                 consumerInfo.append("consumerId= \"" + Utils.getUserApiKey(request) + "\",");
296                                                                 consumerInfo.append("consumerGroup= \""
297                                                                                 + getConsumerGroupFromRequest(request.getRequestURI()) + "\",");
298                                                                 consumerInfo.append("consumeTime= \"" + Utils.getFormattedDate(new Date()) + "\",");
299                                                         }
300                                                         log.info("Consumer [" + consumerInfo.toString() + "transactionId= \"" + transId
301                                                                         + "\",messageLength= \"" + msg.length() + "\",topic= \"" + topic.getName() + "\"]");
302                                                 }
303                                         } catch (Exception e) {
304                                         }
305                                 }
306
307                         }
308
309                         @Override
310                         /**
311                          * 
312                          * It makes thread to wait
313                          * 
314                          * @throws IOException
315                          */
316                         public void onWait() throws IOException {
317                                 os.flush(); // likely totally unnecessary for a network socket
318                                 try {
319                                         // FIXME: would be good to wait/signal
320                                         Thread.sleep(100);
321                                 } catch (InterruptedException e) {
322                                     Thread.currentThread().interrupt();
323                                 }
324                         }
325                 });
326
327                 
328                 if (null != dmaapContext && istransEnable && istransType) {
329
330                         dmaapContext.getResponse().setHeader("transactionId",
331                                         Utils.getResponseTransactionId(responseTransactionId));
332                 }
333
334                 os.write(']');
335                 os.flush();
336
337                 boolean close_out_stream = true;
338                 String strclose_out_stream = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "close.output.stream");
339                 if (null != strclose_out_stream)
340                         close_out_stream = Boolean.parseBoolean(strclose_out_stream);
341
342                 
343                 if (close_out_stream) {
344                         os.close();
345                         
346                 }
347         }
348
349         /**
350          * 
351          * @param requestURI
352          * @return
353          */
354         private String getConsumerGroupFromRequest(String requestURI) {
355                 if (null != requestURI && !requestURI.isEmpty()) {
356
357                         String consumerDetails = requestURI.substring(requestURI.indexOf("events/") + 7);
358
359                         int startIndex = consumerDetails.indexOf("/") + 1;
360                         int endIndex = consumerDetails.lastIndexOf("/");
361                         return consumerDetails.substring(startIndex, endIndex);
362                 }
363                 return null;
364         }
365
366         /**
367          * 
368          * @param op
369          * @return
370          * @throws IOException
371          * @throws JSONException
372          */
373         public int forEachMessage(operation op) throws IOException, JSONException {
374                 final int effectiveLimit = (fLimit == 0 ? kTopLimit : fLimit);
375
376                 int count = 0;
377                 boolean firstPing = true;
378                 // boolean isTransType=false;
379                 final long startMs = System.currentTimeMillis();
380                 final long timeoutMs = fTimeoutMs + startMs -500; //500 ms used in poll 
381
382                 while (firstPing || (count == 0 && System.currentTimeMillis() < timeoutMs)) {
383                         if (!firstPing) {
384                                 op.onWait();
385                         }
386                         firstPing = false;
387
388                 
389                                  Consumer.Message msgRecord = null;
390                                  while (count < effectiveLimit && (msgRecord =
391                                  fConsumer.nextMessage()) != null) {
392
393                                 String message = "";
394                                 String transactionid = "";
395                                 try {
396                    // String msgRecord = msg;
397                                         JSONObject jsonMessage = new JSONObject(msgRecord);
398                                         String[] keys = JSONObject.getNames(jsonMessage);
399                                         boolean wrapheader1 = false;
400                                         boolean wrapheader2 = false;
401                                         boolean found_attr3 = false;
402                                         String wrapElement1 = "message";
403                                         String wrapElement2 = "msgWrapMR";
404                                         String transIdElement = "transactionId";
405                                         if (null != keys) {
406                                                 for (String key : keys) {
407                                                         if (key.equals(wrapElement1)) {
408                                                                 wrapheader1 = true;
409                                                         } else if (key.equals(wrapElement2)) {
410                                                                 wrapheader2 = true;
411                                                         } else if (key.equals(transIdElement)) {
412                                                                 found_attr3 = true;
413                                                                 transactionid = jsonMessage.getString(key);
414                                                         }
415                                                 }
416                                         }
417
418                                         // returns contents of attribute 1 if both attributes
419                                         // present, otherwise
420                                         // the whole msg
421                                         if (wrapheader2 && found_attr3) {
422                                                 message = jsonMessage.getString(wrapElement2);
423                                         } else if (wrapheader1 && found_attr3) {
424                                                 message = jsonMessage.getString(wrapElement1);
425                                         } else {
426                                                 message = msgRecord.getMessage();
427                                         }
428                                         // jsonMessage = extractMessage(jsonMessage ,
429                                         // "message","msgWrapMR","transactionId");
430                                         istransType = true;
431                                 } catch (JSONException e) { // This check is required for the
432                                                                                         // message sent by MR AAF flow but
433                                                                                         // consumed by UEB ACL flow which
434                                                                                         // wont expect transaction id in
435                                                                                         // cambria client api
436                                         // Ignore
437                                         log.info("JSON Exception logged when the message is non JSON Format");
438                                 } catch (Exception exp) {
439                                         log.info("****Some Exception occured for writing messages in topic" + topic.getName()
440                                                         + "  Exception" + exp);
441                                 }
442                                 if (message == null || message.equals("")) {
443                                         istransType = false;
444                                         message = msgRecord.getMessage();
445                                 }
446
447                                 // If filters are enabled/set, message should be in JSON format
448                                 // for filters to work for
449                                 // otherwise filter will automatically ignore message in
450                                 // non-json format.
451                                 if (filterMatches(message)) {
452                                         op.onMessage(count, message, transactionid, msgRecord.getOffset());
453                                         count++;
454
455                                 }
456
457                         }
458                 }
459                 return count;
460         }
461
462         
463
464         /**
465          * 
466          * Checks whether filter is initialized
467          */
468         
469                 
470         
471
472         /**
473          * 
474          * @param msg
475          * @return
476          */
477         private boolean filterMatches(String msg) {
478                 boolean result = true;
479                 
480                 
481                                 
482                                 
483                         
484                         
485                         
486                                 
487                         
488                                 
489                 
490         
491
492                 return result;
493         }
494
495         public DMaaPContext getDmaapContext() {
496                 return dmaapContext;
497         }
498
499         public void setDmaapContext(DMaaPContext dmaapContext) {
500                 this.dmaapContext = dmaapContext;
501         }
502
503         public Topic getTopic() {
504                 return topic;
505         }
506
507         public void setTopic(Topic topic) {
508                 this.topic = topic;
509         }
510
511         public void setTopicStyle(boolean aaftopic) {
512                 this.isAAFTopic = aaftopic;
513         }
514
515         public void setTransEnabled(boolean transEnable) {
516                 this.istransEnable = transEnable;
517         }
518
519         
520         private final Consumer fConsumer;
521         private final int fLimit;
522         private final int fTimeoutMs;
523         
524         private final boolean fPretty;
525         private final boolean fWithMeta;
526         private int fSent;
527
528         
529         private DMaaPContext dmaapContext;
530         private String responseTransactionId;
531         private Topic topic;
532         private boolean isAAFTopic = false;
533         private boolean istransEnable = false;
534         private ArrayList<Consumer> fKafkaConsumerList;
535         private boolean istransType = true;
536         // private static final Logger log =
537
538
539         private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
540
541         public int getfLimit() {
542                 return fLimit;
543         }
544
545         public int getfTimeoutMs() {
546                 return fTimeoutMs;
547         }
548
549         public boolean isfPretty() {
550                 return fPretty;
551         }
552
553         public boolean isfWithMeta() {
554                 return fWithMeta;
555         }
556
557         public boolean isAAFTopic() {
558                 return isAAFTopic;
559         }
560
561         public boolean isIstransEnable() {
562                 return istransEnable;
563         }
564
565         public boolean isIstransType() {
566                 return istransType;
567         }
568 }