93baee087e7af59dc17a839bce5b7d27cf9845ae
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / resources / streamReaders / CambriaRawStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources.streamReaders;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26
27 import javax.servlet.http.HttpServletResponse;
28
29 import org.onap.dmaap.dmf.mr.CambriaApiException;
30 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
31 import org.onap.dmaap.dmf.mr.beans.LogDetails;
32 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet.reader;
33 import com.att.nsa.util.StreamTools;
34
35 /**
36  * 
37  * This stream reader reads raw bytes creating a single message.
38  * @author peter
39  *
40  */
41 public class CambriaRawStreamReader implements reader
42 {
43         /**
44          * This is the constructor of CambriaRawStreamReader, it will basically the read from Input stream
45          * @param is
46          * @param defPart
47          * @throws CambriaApiException
48          */
49         public CambriaRawStreamReader ( InputStream is, String defPart ) throws CambriaApiException
50         {
51                 fStream = is;
52                 fDefPart = defPart;
53                 fClosed = false;
54         }
55
56         @Override
57         /**
58          * 
59          * next() method reads the bytes and
60          * iterates through the messages 
61          * @throws CambriaApiException
62          * 
63          */
64         public message next () throws CambriaApiException
65         {
66                 if ( fClosed ) return null;
67
68                 try
69                 {
70                         final byte[] rawBytes = StreamTools.readBytes ( fStream );
71                         fClosed = true;
72                         return new message ()
73                         {
74                                 private LogDetails logDetails;
75                                 private boolean transactionEnabled;
76
77                                 /**
78                                  * returns boolean value which 
79                                  * indicates whether transaction is enabled
80                                  */
81                                 public boolean isTransactionEnabled() {
82                                         return transactionEnabled;
83                                 }
84
85                                 /**
86                                  * sets boolean value which 
87                                  * indicates whether transaction is enabled
88                                  */
89                                 public void setTransactionEnabled(boolean transactionEnabled) {
90                                         this.transactionEnabled = transactionEnabled;
91                                 }
92                                 
93                                 @Override
94                                 /**
95                                  * @returns key
96                                  * It ch4ecks whether fDefPart value is Null.
97                                  * If yes, it will return ystem.currentTimeMillis () else
98                                  * it will return fDefPart variable value
99                                  */
100                                 public String getKey ()
101                                 {
102                                         return fDefPart == null ? "" + System.currentTimeMillis () : fDefPart;
103                                 }
104
105                                 @Override
106                                 /**
107                                  * returns the message in String type object
108                                  */
109                                 public String getMessage ()
110                                 {
111                                         return new String ( rawBytes );
112                                 }
113
114                                 /**
115                                  * set log details in logDetails variable
116                                  */
117                                 @Override
118                                 public void setLogDetails(LogDetails logDetails) {
119                                         this.logDetails = logDetails;
120                                 }
121
122                                 @Override
123                                 /**
124                                  * get the log details
125                                  */
126                                 public LogDetails getLogDetails() {
127                                         return this.logDetails;
128                                 }
129                         };
130                 }
131                 catch ( IOException e )
132                 {
133                         throw new CambriaApiException ( HttpServletResponse.SC_BAD_REQUEST, e.getMessage () );
134                 }
135         }
136         
137         private final InputStream fStream;
138         private final String fDefPart;
139         private boolean fClosed;
140         
141 }