3dbf339117465a29388eb75e2b38fc1588a9545d
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / resources / streamReaders / CambriaStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.resources.streamReaders;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26
27 import javax.servlet.http.HttpServletResponse;
28
29 import com.att.dmf.mr.CambriaApiException;
30 import com.att.dmf.mr.backends.Publisher.message;
31 import com.att.dmf.mr.beans.LogDetails;
32 import com.att.dmf.mr.resources.CambriaEventSet.reader;
33
34 /**
35  * Read an optionally chunked stream in the Cambria app format. This format
36  * allows for speedier server-side message parsing than pure JSON. It's looks
37  * like:<br/>
38  * <br/>
39  * &lt;keyLength&gt;.&lt;msgLength&gt;.&lt;key&gt;&lt;message&gt;<br/>
40  * <br/>
41  * Whitespace before/after each entry is ignored, so messages can be delivered
42  * with newlines between them, or not.
43  * 
44  * @author peter
45  *
46  */
47 public class CambriaStreamReader implements reader {
48         /**
49          * constructor initializing InputStream with fStream
50          * 
51          * @param senderStream
52          * @throws CambriaApiException
53          */
54         public CambriaStreamReader(InputStream senderStream) throws CambriaApiException {
55                 fStream = senderStream;
56         }
57
58         @Override
59         /**
60          * next method iterates through msg length
61          * throws IOException
62          * throws CambriaApiException
63          * 
64          */
65         public message next() throws IOException, CambriaApiException {
66                 final int keyLen = readLength();
67                 if (keyLen == -1)
68                         return null;
69
70                 final int msgLen = readLength();
71                 final String keyPart = readString(keyLen);
72                 final String msgPart = readString(msgLen);
73
74                 return new msg(keyPart, msgPart);
75         }
76
77         private static class msg implements message {
78                 /**
79                  * constructor initialization
80                  * 
81                  * @param key
82                  * @param msg
83                  */
84                 public msg(String key, String msg) {
85                         // if no key, use the current time. This allows the message to be
86                         // delivered
87                         // in any order without forcing it into a single partition as empty
88                         // string would.
89                         if (key.length() < 1) {
90                                 key = "" + System.currentTimeMillis();
91                         }
92
93                         fKey = key;
94                         fMsg = msg;
95                 }
96
97                 @Override
98                 /**
99                  * @returns fkey
100                  */
101                 public String getKey() {
102                         return fKey;
103                 }
104
105                 @Override
106                 /**
107                  * returns the message in String type object
108                  */
109                 public String getMessage() {
110                         return fMsg;
111                 }
112
113                 private final String fKey;
114                 private final String fMsg;
115                 private LogDetails logDetails;
116                 private boolean transactionEnabled;
117                 
118                 /**
119                  * returns boolean value which 
120                  * indicates whether transaction is enabled
121                  */
122                 public boolean isTransactionEnabled() {
123                         return transactionEnabled;
124                 }
125                 
126                 /**
127                  * sets boolean value which 
128                  * indicates whether transaction is enabled
129                  */
130                 public void setTransactionEnabled(boolean transactionEnabled) {
131                         this.transactionEnabled = transactionEnabled;
132                 }
133
134                 @Override
135                 /**
136                  * set log details in logDetails variable
137                  */
138                 public void setLogDetails(LogDetails logDetails) {
139                         this.logDetails = logDetails;
140                 }
141
142                 @Override
143                 /**
144                  * get the log details
145                  */
146                 public LogDetails getLogDetails() {
147                         return this.logDetails;
148                 }
149
150         }
151
152         private final InputStream fStream;
153
154         /**
155          * max cambria length indicates message length
156          
157         // This limit is here to prevent the server from spinning on a long string of numbers
158     // that is delivered with 'application/cambria' as the format. The limit needs to be
159     // large enough to support the max message length (currently 1MB, the default Kafka
160     // limit)
161     * */
162      
163     private static final int kMaxCambriaLength = 4*1000*1024;
164
165
166         /**
167          * 
168          * @return
169          * @throws IOException
170          * @throws CambriaApiException
171          */
172         private int readLength() throws IOException, CambriaApiException {
173                 // always ignore leading whitespace
174                 int c = fStream.read();
175                 while (Character.isWhitespace(c)) {
176                         c = fStream.read();
177                 }
178
179                 if (c == -1) {
180                         return -1;
181                 }
182
183                 int result = 0;
184                 while (Character.isDigit(c)) {
185                         result = (result * 10) + (c - '0');
186                         if (result > kMaxCambriaLength) {
187                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length.");
188                         }
189                         c = fStream.read();
190                 }
191
192                 if (c != '.') {
193                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected . after length.");
194                 }
195
196                 return result;
197         }
198
199         /**
200          * 
201          * @param len
202          * @return
203          * @throws IOException
204          * @throws CambriaApiException
205          */
206         private String readString(int len) throws IOException, CambriaApiException {
207                 final byte[] buffer = new byte[len];
208
209                 final long startMs = System.currentTimeMillis();
210                 final long timeoutMs = startMs + 30000; // FIXME configurable
211
212                 int readTotal = 0;
213                 while (readTotal < len) {
214                         final int read = fStream.read(buffer, readTotal, len - readTotal);
215                         if (read == -1 || System.currentTimeMillis() > timeoutMs) {
216                                 // EOF
217                                 break;
218                         }
219                         readTotal += read;
220                 }
221
222                 if (readTotal < len) {
223                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
224                                         "End of stream while reading " + len + " bytes");
225                 }
226
227                 return new String(buffer);
228         }
229 }