DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / resources / streamReaders / CambriaStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources.streamReaders;
23
24 import org.onap.dmaap.dmf.mr.CambriaApiException;
25 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
26 import org.onap.dmaap.dmf.mr.beans.LogDetails;
27 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet.reader;
28
29 import javax.servlet.http.HttpServletResponse;
30 import java.io.IOException;
31 import java.io.InputStream;
32
33 /**
34  * Read an optionally chunked stream in the Cambria app format. This format
35  * allows for speedier server-side message parsing than pure JSON. It's looks
36  * like:<br/>
37  * <br/>
38  * &lt;keyLength&gt;.&lt;msgLength&gt;.&lt;key&gt;&lt;message&gt;<br/>
39  * <br/>
40  * Whitespace before/after each entry is ignored, so messages can be delivered
41  * with newlines between them, or not.
42  * 
43  * @author peter
44  *
45  */
46 public class CambriaStreamReader implements reader {
47         /**
48          * constructor initializing InputStream with fStream
49          * 
50          * @param senderStream
51          * @throws CambriaApiException
52          */
53         public CambriaStreamReader(InputStream senderStream) throws CambriaApiException {
54                 fStream = senderStream;
55         }
56
57         @Override
58         /**
59          * next method iterates through msg length
60          * throws IOException
61          * throws CambriaApiException
62          * 
63          */
64         public message next() throws IOException, CambriaApiException {
65                 final int keyLen = readLength();
66                 if (keyLen == -1)
67                         return null;
68
69                 final int msgLen = readLength();
70                 final String keyPart = readString(keyLen);
71                 final String msgPart = readString(msgLen);
72
73                 return new msg(keyPart, msgPart);
74         }
75
76         private static class msg implements message {
77                 /**
78                  * constructor initialization
79                  * 
80                  * @param key
81                  * @param msg
82                  */
83                 public msg(String key, String msg) {
84                         // if no key, use the current time. This allows the message to be
85                         // delivered
86                         // in any order without forcing it into a single partition as empty
87                         // string would.
88                         if (key.length() < 1) {
89                                 key = "" + System.currentTimeMillis();
90                         }
91
92                         fKey = key;
93                         fMsg = msg;
94                 }
95
96                 @Override
97                 /**
98                  * @returns fkey
99                  */
100                 public String getKey() {
101                         return fKey;
102                 }
103
104                 @Override
105                 /**
106                  * returns the message in String type object
107                  */
108                 public String getMessage() {
109                         return fMsg;
110                 }
111
112                 private final String fKey;
113                 private final String fMsg;
114                 private LogDetails logDetails;
115                 private boolean transactionEnabled;
116                 
117                 /**
118                  * returns boolean value which 
119                  * indicates whether transaction is enabled
120                  */
121                 public boolean isTransactionEnabled() {
122                         return transactionEnabled;
123                 }
124                 
125                 /**
126                  * sets boolean value which 
127                  * indicates whether transaction is enabled
128                  */
129                 public void setTransactionEnabled(boolean transactionEnabled) {
130                         this.transactionEnabled = transactionEnabled;
131                 }
132
133                 @Override
134                 /**
135                  * set log details in logDetails variable
136                  */
137                 public void setLogDetails(LogDetails logDetails) {
138                         this.logDetails = logDetails;
139                 }
140
141                 @Override
142                 /**
143                  * get the log details
144                  */
145                 public LogDetails getLogDetails() {
146                         return this.logDetails;
147                 }
148
149         }
150
151         private final InputStream fStream;
152
153         /**
154          * max cambria length indicates message length
155          
156         // This limit is here to prevent the server from spinning on a long string of numbers
157     // that is delivered with 'application/cambria' as the format. The limit needs to be
158     // large enough to support the max message length (currently 1MB, the default Kafka
159     // limit)
160     * */
161      
162     private static final int kMaxCambriaLength = 4*1000*1024;
163
164
165         /**
166          * 
167          * @return
168          * @throws IOException
169          * @throws CambriaApiException
170          */
171         private int readLength() throws IOException, CambriaApiException {
172                 // always ignore leading whitespace
173                 int c = fStream.read();
174                 while (Character.isWhitespace(c)) {
175                         c = fStream.read();
176                 }
177
178                 if (c == -1) {
179                         return -1;
180                 }
181
182                 int result = 0;
183                 while (Character.isDigit(c)) {
184                         result = (result * 10) + (c - '0');
185                         if (result > kMaxCambriaLength) {
186                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length.");
187                         }
188                         c = fStream.read();
189                 }
190
191                 if (c != '.') {
192                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length.");
193                 }
194
195                 return result;
196         }
197
198         /**
199          * 
200          * @param len
201          * @return
202          * @throws IOException
203          * @throws CambriaApiException
204          */
205         private String readString(int len) throws IOException, CambriaApiException {
206                 final byte[] buffer = new byte[len];
207
208                 final long startMs = System.currentTimeMillis();
209                 final long timeoutMs = startMs + 30000; // FIXME configurable
210
211                 int readTotal = 0;
212                 while (readTotal < len) {
213                         final int read = fStream.read(buffer, readTotal, len - readTotal);
214                         if (read == -1 || System.currentTimeMillis() > timeoutMs) {
215                                 // EOF
216                                 break;
217                         }
218                         readTotal += read;
219                 }
220
221                 if (readTotal < len) {
222                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
223                                         "End of stream while reading " + len + " bytes");
224                 }
225
226                 return new String(buffer);
227         }
228 }